Trace across to_device messages.

This commit is contained in:
Jorik Schellekens 2019-07-29 11:37:10 +01:00
parent fa87004bc1
commit e126bf862a
2 changed files with 46 additions and 4 deletions

View file

@ -51,7 +51,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
def get_new_messages_for_device_txn(txn): def get_new_messages_for_device_txn(txn):
sql = ( sql = (
"SELECT stream_id, message_json FROM device_inbox" "SELECT stream_id, message_json, context FROM device_inbox"
" WHERE user_id = ? AND device_id = ?" " WHERE user_id = ? AND device_id = ?"
" AND ? < stream_id AND stream_id <= ?" " AND ? < stream_id AND stream_id <= ?"
" ORDER BY stream_id ASC" " ORDER BY stream_id ASC"
@ -61,11 +61,22 @@ class DeviceInboxWorkerStore(SQLBaseStore):
sql, (user_id, device_id, last_stream_id, current_stream_id, limit) sql, (user_id, device_id, last_stream_id, current_stream_id, limit)
) )
messages = [] messages = []
references = []
for row in txn: for row in txn:
stream_pos = row[0] stream_pos = row[0]
messages.append(json.loads(row[1])) messages.append(json.loads(row[1]))
references.append(
opentracing.extract_text_map(
json.loads(json.loads(row[2])["opentracing"])
)
)
if len(messages) < limit: if len(messages) < limit:
stream_pos = current_stream_id stream_pos = current_stream_id
with opentracing.start_active_span(
"do we have send??" # , child_of=references[0]
):
opentracing.set_tag("ref", references)
pass
return (messages, stream_pos) return (messages, stream_pos)
return self.runInteraction( return self.runInteraction(
@ -281,6 +292,7 @@ class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore):
allow_none=True, allow_none=True,
) )
if already_inserted is not None: if already_inserted is not None:
opentracing.log_kv({"message": "message already received"})
return return
# Add an entry for this message_id so that we know we've processed # Add an entry for this message_id so that we know we've processed
@ -294,6 +306,9 @@ class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore):
"received_ts": now_ms, "received_ts": now_ms,
}, },
) )
opentracing.log_kv(
{"message": "device message added to device_federation_inbox"}
)
# Add the messages to the approriate local device inboxes so that # Add the messages to the approriate local device inboxes so that
# they'll be sent to the devices when they next sync. # they'll be sent to the devices when they next sync.
@ -336,6 +351,13 @@ class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore):
messages_json_for_user[device] = message_json messages_json_for_user[device] = message_json
else: else:
if not devices: if not devices:
opentracing.log_kv(
{
"message": "No devices for user.",
"user_id": user_id,
"messages": messages_by_device,
}
)
continue continue
sql = ( sql = (
"SELECT device_id FROM devices" "SELECT device_id FROM devices"
@ -361,13 +383,17 @@ class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore):
sql = ( sql = (
"INSERT INTO device_inbox" "INSERT INTO device_inbox"
" (user_id, device_id, stream_id, message_json)" " (user_id, device_id, stream_id, message_json, context)"
" VALUES (?,?,?,?)" " VALUES (?,?,?,?,?)"
) )
rows = [] rows = []
# TODO: User whitelisting?
context = json.dumps(
{"opentracing": opentracing.active_span_context_as_string()}
)
for user_id, messages_by_device in local_by_user_then_device.items(): for user_id, messages_by_device in local_by_user_then_device.items():
for device_id, message_json in messages_by_device.items(): for device_id, message_json in messages_by_device.items():
rows.append((user_id, device_id, stream_id, message_json)) rows.append((user_id, device_id, stream_id, message_json, context))
txn.executemany(sql, rows) txn.executemany(sql, rows)

View file

@ -0,0 +1,16 @@
/* Copyright 2019 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
ALTER TABLE device_inbox ADD context TEXT;