Correctly handle the 'age' key in events and pdus

This commit is contained in:
Erik Johnston 2014-09-15 13:26:05 +01:00
parent 76217890c0
commit 5bd9369a62
10 changed files with 51 additions and 15 deletions

View file

@ -17,6 +17,18 @@ from synapse.api.errors import SynapseError, Codes
from synapse.util.jsonobject import JsonEncodedObject
def serialize_event(hs, e):
# FIXME(erikj): To handle the case of presence events and the like
if not isinstance(e, SynapseEvent):
return e
d = e.get_dict()
if "age_ts" in d:
d["age"] = int(hs.get_clock().time_msec()) - d["age_ts"]
return d
class SynapseEvent(JsonEncodedObject):
"""Base class for Synapse events. These are JSON objects which must abide
@ -43,6 +55,7 @@ class SynapseEvent(JsonEncodedObject):
"content", # HTTP body, JSON
"state_key",
"required_power_level",
"age_ts",
]
internal_keys = [

View file

@ -59,6 +59,14 @@ class EventFactory(object):
if "ts" not in kwargs:
kwargs["ts"] = int(self.clock.time_msec())
# The "age" key is a delta timestamp that should be converted into an
# absolute timestamp the minute we see it.
if "age" in kwargs:
kwargs["age_ts"] = int(self.clock.time_msec()) - int(kwargs["age"])
del kwargs["age"]
elif "age_ts" not in kwargs:
kwargs["age_ts"] = int(self.clock.time_msec())
if etype in self._event_list:
handler = self._event_list[etype]
else:

View file

@ -291,6 +291,12 @@ class ReplicationLayer(object):
def on_incoming_transaction(self, transaction_data):
transaction = Transaction(**transaction_data)
for p in transaction.pdus:
if "age" in p:
p["age_ts"] = int(self.clock.time_msec()) - int(p["age"])
pdu_list = [Pdu(**p) for p in transaction.pdus]
logger.debug("[%s] Got transaction", transaction.transaction_id)
response = yield self.transaction_actions.have_responded(transaction)
@ -303,8 +309,6 @@ class ReplicationLayer(object):
logger.debug("[%s] Transacition is new", transaction.transaction_id)
pdu_list = [Pdu(**p) for p in transaction.pdus]
dl = []
for pdu in pdu_list:
dl.append(self._handle_new_pdu(pdu))
@ -405,9 +409,14 @@ class ReplicationLayer(object):
"""Returns a new Transaction containing the given PDUs suitable for
transmission.
"""
pdus = [p.get_dict() for p in pdu_list]
for p in pdus:
if "age_ts" in pdus:
p["age"] = int(self.clock.time_msec()) - p["age_ts"]
return Transaction(
pdus=[p.get_dict() for p in pdu_list],
origin=self.server_name,
pdus=pdus,
ts=int(self._clock.time_msec()),
destination=None,
)

View file

@ -15,7 +15,6 @@
from twisted.internet import defer
from synapse.api.events import SynapseEvent
from synapse.util.logutils import log_function
from ._base import BaseHandler
@ -71,10 +70,7 @@ class EventStreamHandler(BaseHandler):
auth_user, room_ids, pagin_config, timeout
)
chunks = [
e.get_dict() if isinstance(e, SynapseEvent) else e
for e in events
]
chunks = [self.hs.serialize_event(e) for e in events]
chunk = {
"chunk": chunks,
@ -92,7 +88,9 @@ class EventStreamHandler(BaseHandler):
# 10 seconds of grace to allow the client to reconnect again
# before we think they're gone
def _later():
logger.debug("_later stopped_user_eventstream %s", auth_user)
logger.debug(
"_later stopped_user_eventstream %s", auth_user
)
self.distributor.fire(
"stopped_user_eventstream", auth_user
)

View file

@ -124,7 +124,7 @@ class MessageHandler(BaseHandler):
)
chunk = {
"chunk": [e.get_dict() for e in events],
"chunk": [self.hs.serialize_event(e) for e in events],
"start": pagin_config.from_token.to_string(),
"end": next_token.to_string(),
}
@ -296,7 +296,7 @@ class MessageHandler(BaseHandler):
end_token = now_token.copy_and_replace("room_key", token[1])
d["messages"] = {
"chunk": [m.get_dict() for m in messages],
"chunk": [self.hs.serialize_event(m) for m in messages],
"start": start_token.to_string(),
"end": end_token.to_string(),
}
@ -304,7 +304,7 @@ class MessageHandler(BaseHandler):
current_state = yield self.store.get_current_state(
event.room_id
)
d["state"] = [c.get_dict() for c in current_state]
d["state"] = [self.hs.serialize_event(c) for c in current_state]
except:
logger.exception("Failed to get snapshot")

View file

@ -335,7 +335,7 @@ class RoomMemberHandler(BaseHandler):
member_list = yield self.store.get_room_members(room_id=room_id)
event_list = [
entry.get_dict()
self.hs.serialize_event(entry)
for entry in member_list
]
chunk_data = {

View file

@ -59,7 +59,7 @@ class EventRestServlet(RestServlet):
event = yield handler.get_event(auth_user, event_id)
if event:
defer.returnValue((200, event.get_dict()))
defer.returnValue((200, self.hs.serialize_event(event)))
else:
defer.returnValue((404, "Event not found."))

View file

@ -378,7 +378,7 @@ class RoomTriggerBackfill(RestServlet):
handler = self.handlers.federation_handler
events = yield handler.backfill(remote_server, room_id, limit)
res = [event.get_dict() for event in events]
res = [self.hs.serialize_event(event) for event in events]
defer.returnValue((200, res))

View file

@ -20,6 +20,7 @@
# Imports required for the default HomeServer() implementation
from synapse.federation import initialize_http_replication
from synapse.api.events import serialize_event
from synapse.api.events.factory import EventFactory
from synapse.notifier import Notifier
from synapse.api.auth import Auth
@ -138,6 +139,9 @@ class BaseHomeServer(object):
object."""
return RoomID.from_string(s, hs=self)
def serialize_event(self, e):
return serialize_event(self, e)
# Build magic accessors for every dependency
for depname in BaseHomeServer.DEPENDENCIES:
BaseHomeServer._make_dependency_method(depname)

View file

@ -315,6 +315,10 @@ class SQLBaseStore(object):
d["content"] = json.loads(d["content"])
del d["unrecognized_keys"]
if "age_ts" not in d:
# For compatibility
d["age_ts"] = d["ts"] if "ts" in d else 0
return self.event_factory.create_event(
etype=d["type"],
**d