diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py index 79bb0ea46d..7bd8896458 100644 --- a/synapse/storage/deviceinbox.py +++ b/synapse/storage/deviceinbox.py @@ -51,7 +51,7 @@ class DeviceInboxWorkerStore(SQLBaseStore): def get_new_messages_for_device_txn(txn): sql = ( - "SELECT stream_id, message_json FROM device_inbox" + "SELECT stream_id, message_json, context FROM device_inbox" " WHERE user_id = ? AND device_id = ?" " AND ? < stream_id AND stream_id <= ?" " ORDER BY stream_id ASC" @@ -61,11 +61,22 @@ class DeviceInboxWorkerStore(SQLBaseStore): sql, (user_id, device_id, last_stream_id, current_stream_id, limit) ) messages = [] + references = [] for row in txn: stream_pos = row[0] messages.append(json.loads(row[1])) + references.append( + opentracing.extract_text_map( + json.loads(json.loads(row[2])["opentracing"]) + ) + ) if len(messages) < limit: stream_pos = current_stream_id + with opentracing.start_active_span( + "do we have send??" # , child_of=references[0] + ): + opentracing.set_tag("ref", references) + pass return (messages, stream_pos) return self.runInteraction( @@ -281,6 +292,7 @@ class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore): allow_none=True, ) if already_inserted is not None: + opentracing.log_kv({"message": "message already received"}) return # Add an entry for this message_id so that we know we've processed @@ -294,6 +306,9 @@ class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore): "received_ts": now_ms, }, ) + opentracing.log_kv( + {"message": "device message added to device_federation_inbox"} + ) # Add the messages to the approriate local device inboxes so that # they'll be sent to the devices when they next sync. @@ -336,6 +351,13 @@ class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore): messages_json_for_user[device] = message_json else: if not devices: + opentracing.log_kv( + { + "message": "No devices for user.", + "user_id": user_id, + "messages": messages_by_device, + } + ) continue sql = ( "SELECT device_id FROM devices" @@ -361,13 +383,17 @@ class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore): sql = ( "INSERT INTO device_inbox" - " (user_id, device_id, stream_id, message_json)" - " VALUES (?,?,?,?)" + " (user_id, device_id, stream_id, message_json, context)" + " VALUES (?,?,?,?,?)" ) rows = [] + # TODO: User whitelisting? + context = json.dumps( + {"opentracing": opentracing.active_span_context_as_string()} + ) for user_id, messages_by_device in local_by_user_then_device.items(): for device_id, message_json in messages_by_device.items(): - rows.append((user_id, device_id, stream_id, message_json)) + rows.append((user_id, device_id, stream_id, message_json, context)) txn.executemany(sql, rows) diff --git a/synapse/storage/schema/delta/55/add_span_context_to_device_inbox.sql b/synapse/storage/schema/delta/55/add_span_context_to_device_inbox.sql new file mode 100644 index 0000000000..769c17e6ef --- /dev/null +++ b/synapse/storage/schema/delta/55/add_span_context_to_device_inbox.sql @@ -0,0 +1,16 @@ +/* Copyright 2019 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +ALTER TABLE device_inbox ADD context TEXT; \ No newline at end of file