Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes

This commit is contained in:
Erik Johnston 2017-03-09 10:40:47 +00:00
commit 929b005999
6 changed files with 187 additions and 69 deletions

View file

@ -0,0 +1,22 @@
#!/bin/bash
set -eux
: ${WORKSPACE:="$(pwd)"}
export WORKSPACE
export PYTHONDONTWRITEBYTECODE=yep
export SYNAPSE_CACHE_FACTOR=1
export HAPROXY_BIN=/home/haproxy/haproxy-1.6.11/haproxy
./jenkins/prepare_synapse.sh
./jenkins/clone.sh sytest https://github.com/matrix-org/sytest.git
./jenkins/clone.sh dendron https://github.com/matrix-org/dendron.git
./dendron/jenkins/build_dendron.sh
./sytest/jenkins/prep_sytest_for_postgres.sh
./sytest/jenkins/install_and_run.sh \
--synapse-directory $WORKSPACE \
--dendron $WORKSPACE/dendron/bin/dendron \
--haproxy \

View file

@ -17,9 +17,3 @@ export SYNAPSE_CACHE_FACTOR=1
./sytest/jenkins/install_and_run.sh \ ./sytest/jenkins/install_and_run.sh \
--synapse-directory $WORKSPACE \ --synapse-directory $WORKSPACE \
--dendron $WORKSPACE/dendron/bin/dendron \ --dendron $WORKSPACE/dendron/bin/dendron \
--pusher \
--synchrotron \
--federation-reader \
--client-reader \
--appservice \
--federation-sender \

View file

@ -16,6 +16,7 @@ from synapse.api import errors
from synapse.api.constants import EventTypes from synapse.api.constants import EventTypes
from synapse.util import stringutils from synapse.util import stringutils
from synapse.util.async import Linearizer from synapse.util.async import Linearizer
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.metrics import measure_func from synapse.util.metrics import measure_func
from synapse.types import get_domain_from_id, RoomStreamToken from synapse.types import get_domain_from_id, RoomStreamToken
from twisted.internet import defer from twisted.internet import defer
@ -34,10 +35,11 @@ class DeviceHandler(BaseHandler):
self.state = hs.get_state_handler() self.state = hs.get_state_handler()
self.federation_sender = hs.get_federation_sender() self.federation_sender = hs.get_federation_sender()
self.federation = hs.get_replication_layer() self.federation = hs.get_replication_layer()
self._remote_edue_linearizer = Linearizer(name="remote_device_list")
self._edu_updater = DeviceListEduUpdater(hs, self)
self.federation.register_edu_handler( self.federation.register_edu_handler(
"m.device_list_update", self._incoming_device_list_update, "m.device_list_update", self._edu_updater.incoming_device_list_update,
) )
self.federation.register_query_handler( self.federation.register_query_handler(
"user_devices", self.on_federation_query_user_devices, "user_devices", self.on_federation_query_user_devices,
@ -299,58 +301,6 @@ class DeviceHandler(BaseHandler):
# and those that actually still share a room with the user # and those that actually still share a room with the user
defer.returnValue(users_who_share_room & possibly_changed) defer.returnValue(users_who_share_room & possibly_changed)
@measure_func("_incoming_device_list_update")
@defer.inlineCallbacks
def _incoming_device_list_update(self, origin, edu_content):
user_id = edu_content["user_id"]
device_id = edu_content["device_id"]
stream_id = edu_content["stream_id"]
prev_ids = edu_content.get("prev_id", [])
if get_domain_from_id(user_id) != origin:
# TODO: Raise?
logger.warning("Got device list update edu for %r from %r", user_id, origin)
return
rooms = yield self.store.get_rooms_for_user(user_id)
if not rooms:
# We don't share any rooms with this user. Ignore update, as we
# probably won't get any further updates.
return
with (yield self._remote_edue_linearizer.queue(user_id)):
# If the prev id matches whats in our cache table, then we don't need
# to resync the users device list, otherwise we do.
resync = True
if len(prev_ids) == 1:
extremity = yield self.store.get_device_list_last_stream_id_for_remote(
user_id
)
logger.info("Extrem: %r, prev_ids: %r", extremity, prev_ids)
if extremity and prev_ids[0] and int(extremity) >= int(prev_ids[0]):
resync = False
if resync:
# Fetch all devices for the user.
result = yield self.federation.query_user_devices(origin, user_id)
stream_id = result["stream_id"]
devices = result["devices"]
yield self.store.update_remote_device_list_cache(
user_id, devices, stream_id,
)
device_ids = [device["device_id"] for device in devices]
yield self.notify_device_update(user_id, device_ids)
else:
# Simply update the single device, since we know that is the only
# change (becuase of the single prev_id matching the current cache)
content = dict(edu_content)
for key in ("user_id", "device_id", "stream_id", "prev_ids"):
content.pop(key, None)
yield self.store.update_remote_device_list_cache_entry(
user_id, device_id, content, stream_id,
)
yield self.notify_device_update(user_id, [device_id])
@defer.inlineCallbacks @defer.inlineCallbacks
def on_federation_query_user_devices(self, user_id): def on_federation_query_user_devices(self, user_id):
stream_id, devices = yield self.store.get_devices_with_keys_by_user(user_id) stream_id, devices = yield self.store.get_devices_with_keys_by_user(user_id)
@ -376,3 +326,129 @@ def _update_device_from_client_ips(device, client_ips):
"last_seen_ts": ip.get("last_seen"), "last_seen_ts": ip.get("last_seen"),
"last_seen_ip": ip.get("ip"), "last_seen_ip": ip.get("ip"),
}) })
class DeviceListEduUpdater(object):
"Handles incoming device list updates from federation and updates the DB"
def __init__(self, hs, device_handler):
self.store = hs.get_datastore()
self.federation = hs.get_replication_layer()
self.clock = hs.get_clock()
self.device_handler = device_handler
self._remote_edu_linearizer = Linearizer(name="remote_device_list")
# user_id -> list of updates waiting to be handled.
self._pending_updates = {}
# Recently seen stream ids. We don't bother keeping these in the DB,
# but they're useful to have them about to reduce the number of spurious
# resyncs.
self._seen_updates = ExpiringCache(
cache_name="device_update_edu",
clock=self.clock,
max_len=10000,
expiry_ms=30 * 60 * 1000,
iterable=True,
)
@defer.inlineCallbacks
def incoming_device_list_update(self, origin, edu_content):
"""Called on incoming device list update from federation. Responsible
for parsing the EDU and adding to pending updates list.
"""
user_id = edu_content.pop("user_id")
device_id = edu_content.pop("device_id")
stream_id = str(edu_content.pop("stream_id")) # They may come as ints
prev_ids = edu_content.pop("prev_id", [])
prev_ids = [str(p) for p in prev_ids] # They may come as ints
if get_domain_from_id(user_id) != origin:
# TODO: Raise?
logger.warning("Got device list update edu for %r from %r", user_id, origin)
return
rooms = yield self.store.get_rooms_for_user(user_id)
if not rooms:
# We don't share any rooms with this user. Ignore update, as we
# probably won't get any further updates.
return
self._pending_updates.setdefault(user_id, []).append(
(device_id, stream_id, prev_ids, edu_content)
)
yield self._handle_device_updates(user_id)
@measure_func("_incoming_device_list_update")
@defer.inlineCallbacks
def _handle_device_updates(self, user_id):
"Actually handle pending updates."
with (yield self._remote_edu_linearizer.queue(user_id)):
pending_updates = self._pending_updates.pop(user_id, [])
if not pending_updates:
# This can happen since we batch updates
return
resync = yield self._need_to_do_resync(user_id, pending_updates)
if resync:
# Fetch all devices for the user.
origin = get_domain_from_id(user_id)
result = yield self.federation.query_user_devices(origin, user_id)
stream_id = result["stream_id"]
devices = result["devices"]
yield self.store.update_remote_device_list_cache(
user_id, devices, stream_id,
)
device_ids = [device["device_id"] for device in devices]
yield self.device_handler.notify_device_update(user_id, device_ids)
else:
# Simply update the single device, since we know that is the only
# change (becuase of the single prev_id matching the current cache)
for device_id, stream_id, prev_ids, content in pending_updates:
yield self.store.update_remote_device_list_cache_entry(
user_id, device_id, content, stream_id,
)
yield self.device_handler.notify_device_update(
user_id, [device_id for device_id, _, _, _ in pending_updates]
)
self._seen_updates.setdefault(user_id, set()).update(
stream_id for _, stream_id, _, _ in pending_updates
)
@defer.inlineCallbacks
def _need_to_do_resync(self, user_id, updates):
"""Given a list of updates for a user figure out if we need to do a full
resync, or whether we have enough data that we can just apply the delta.
"""
seen_updates = self._seen_updates.get(user_id, set())
extremity = yield self.store.get_device_list_last_stream_id_for_remote(
user_id
)
stream_id_in_updates = set() # stream_ids in updates list
for _, stream_id, prev_ids, _ in updates:
if not prev_ids:
# We always do a resync if there are no previous IDs
defer.returnValue(True)
for prev_id in prev_ids:
if prev_id == extremity:
continue
elif prev_id in seen_updates:
continue
elif prev_id in stream_id_in_updates:
continue
else:
defer.returnValue(True)
stream_id_in_updates.add(stream_id)
defer.returnValue(False)

View file

@ -488,10 +488,6 @@ class SQLBaseStore(object):
" AND ".join("%s = ?" % (k,) for k in keyvalues) " AND ".join("%s = ?" % (k,) for k in keyvalues)
) )
sqlargs = values.values() + keyvalues.values() sqlargs = values.values() + keyvalues.values()
logger.debug(
"[SQL] %s Args=%s",
sql, sqlargs,
)
txn.execute(sql, sqlargs) txn.execute(sql, sqlargs)
if txn.rowcount == 0: if txn.rowcount == 0:
@ -506,10 +502,6 @@ class SQLBaseStore(object):
", ".join(k for k in allvalues), ", ".join(k for k in allvalues),
", ".join("?" for _ in allvalues) ", ".join("?" for _ in allvalues)
) )
logger.debug(
"[SQL] %s Args=%s",
sql, keyvalues.values(),
)
txn.execute(sql, allvalues.values()) txn.execute(sql, allvalues.values())
return True return True

View file

@ -33,6 +33,13 @@ class DeviceStore(SQLBaseStore):
self._prune_old_outbound_device_pokes, 60 * 60 * 1000 self._prune_old_outbound_device_pokes, 60 * 60 * 1000
) )
self.register_background_index_update(
"device_lists_stream_idx",
index_name="device_lists_stream_user_id",
table="device_lists_stream",
columns=["user_id", "device_id"],
)
@defer.inlineCallbacks @defer.inlineCallbacks
def store_device(self, user_id, device_id, def store_device(self, user_id, device_id,
initial_device_display_name): initial_device_display_name):
@ -501,7 +508,7 @@ class DeviceStore(SQLBaseStore):
defer.returnValue(set(changed)) defer.returnValue(set(changed))
sql = """ sql = """
SELECT user_id FROM device_lists_stream WHERE stream_id > ? SELECT DISTINCT user_id FROM device_lists_stream WHERE stream_id > ?
""" """
rows = yield self._execute("get_user_whose_devices_changed", None, sql, from_key) rows = yield self._execute("get_user_whose_devices_changed", None, sql, from_key)
defer.returnValue(set(row[0] for row in rows)) defer.returnValue(set(row[0] for row in rows))
@ -546,6 +553,16 @@ class DeviceStore(SQLBaseStore):
host, stream_id, host, stream_id,
) )
# Delete older entries in the table, as we really only care about
# when the latest change happened.
txn.executemany(
"""
DELETE FROM device_lists_stream
WHERE user_id = ? AND device_id = ? AND stream_id < ?
""",
[(user_id, device_id, stream_id) for device_id in device_ids]
)
self._simple_insert_many_txn( self._simple_insert_many_txn(
txn, txn,
table="device_lists_stream", table="device_lists_stream",

View file

@ -0,0 +1,17 @@
/* Copyright 2017 Vector Creations Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
INSERT into background_updates (update_name, progress_json)
VALUES ('device_lists_stream_idx', '{}');