Merge pull request #88 from matrix-org/batched_get_pdu

Batched get pdu
This commit is contained in:
Erik Johnston 2015-03-02 14:54:27 +00:00
commit b2d2118476
8 changed files with 230 additions and 56 deletions

View file

@ -439,6 +439,25 @@ class FederationClient(FederationBase):
defer.returnValue(ret) defer.returnValue(ret)
@defer.inlineCallbacks
def get_missing_events(self, destination, room_id, earliest_events,
latest_events, limit, min_depth):
content = yield self.transport_layer.get_missing_events(
destination, room_id, earliest_events, latest_events, limit,
min_depth,
)
events = [
self.event_from_pdu_json(e)
for e in content.get("events", [])
]
signed_events = yield self._check_sigs_and_hash_and_fetch(
destination, events, outlier=True
)
defer.returnValue(signed_events)
def event_from_pdu_json(self, pdu_json, outlier=False): def event_from_pdu_json(self, pdu_json, outlier=False):
event = FrozenEvent( event = FrozenEvent(
pdu_json pdu_json

View file

@ -125,6 +125,7 @@ class FederationServer(FederationBase):
results.append({"error": str(e)}) results.append({"error": str(e)})
except Exception as e: except Exception as e:
results.append({"error": str(e)}) results.append({"error": str(e)})
logger.exception("Failed to handle PDU")
if hasattr(transaction, "edus"): if hasattr(transaction, "edus"):
for edu in [Edu(**x) for x in transaction.edus]: for edu in [Edu(**x) for x in transaction.edus]:
@ -297,6 +298,20 @@ class FederationServer(FederationBase):
(200, send_content) (200, send_content)
) )
@defer.inlineCallbacks
@log_function
def on_get_missing_events(self, origin, room_id, earliest_events,
latest_events, limit, min_depth):
missing_events = yield self.handler.on_get_missing_events(
origin, room_id, earliest_events, latest_events, limit, min_depth
)
time_now = self._clock.time_msec()
defer.returnValue({
"events": [ev.get_pdu_json(time_now) for ev in missing_events],
})
@log_function @log_function
def _get_persisted_pdu(self, origin, event_id, do_auth=True): def _get_persisted_pdu(self, origin, event_id, do_auth=True):
""" Get a PDU from the database with given origin and id. """ Get a PDU from the database with given origin and id.
@ -323,7 +338,7 @@ class FederationServer(FederationBase):
@defer.inlineCallbacks @defer.inlineCallbacks
@log_function @log_function
def _handle_new_pdu(self, origin, pdu, max_recursion=10): def _handle_new_pdu(self, origin, pdu, get_missing=True):
# We reprocess pdus when we have seen them only as outliers # We reprocess pdus when we have seen them only as outliers
existing = yield self._get_persisted_pdu( existing = yield self._get_persisted_pdu(
origin, pdu.event_id, do_auth=False origin, pdu.event_id, do_auth=False
@ -375,48 +390,50 @@ class FederationServer(FederationBase):
pdu.room_id, min_depth pdu.room_id, min_depth
) )
prevs = {e_id for e_id, _ in pdu.prev_events}
seen = set(have_seen.keys())
if min_depth and pdu.depth < min_depth: if min_depth and pdu.depth < min_depth:
# This is so that we don't notify the user about this # This is so that we don't notify the user about this
# message, to work around the fact that some events will # message, to work around the fact that some events will
# reference really really old events we really don't want to # reference really really old events we really don't want to
# send to the clients. # send to the clients.
pdu.internal_metadata.outlier = True pdu.internal_metadata.outlier = True
elif min_depth and pdu.depth > min_depth and max_recursion > 0: elif min_depth and pdu.depth > min_depth:
for event_id, hashes in pdu.prev_events: if get_missing and prevs - seen:
if event_id not in have_seen: latest_tuples = yield self.store.get_latest_events_in_room(
logger.debug( pdu.room_id
"_handle_new_pdu requesting pdu %s", )
event_id
# We add the prev events that we have seen to the latest
# list to ensure the remote server doesn't give them to us
latest = set(e_id for e_id, _, _ in latest_tuples)
latest |= seen
missing_events = yield self.get_missing_events(
origin,
pdu.room_id,
earliest_events=list(latest),
latest_events=[pdu.event_id],
limit=10,
min_depth=min_depth,
)
for e in missing_events:
yield self._handle_new_pdu(
origin,
e,
get_missing=False
) )
try: have_seen = yield self.store.have_events(
new_pdu = yield self.federation_client.get_pdu( [ev for ev, _ in pdu.prev_events]
[origin, pdu.origin], )
event_id=event_id,
)
if new_pdu: prevs = {e_id for e_id, _ in pdu.prev_events}
yield self._handle_new_pdu( seen = set(have_seen.keys())
origin, if prevs - seen:
new_pdu, fetch_state = True
max_recursion=max_recursion-1
)
logger.debug("Processed pdu %s", event_id)
else:
logger.warn("Failed to get PDU %s", event_id)
fetch_state = True
except:
# TODO(erikj): Do some more intelligent retries.
logger.exception("Failed to get PDU")
fetch_state = True
else:
prevs = {e_id for e_id, _ in pdu.prev_events}
seen = set(have_seen.keys())
if prevs - seen:
fetch_state = True
else:
fetch_state = True
if fetch_state: if fetch_state:
# We need to get the state at this event, since we haven't # We need to get the state at this event, since we haven't

View file

@ -224,6 +224,8 @@ class TransactionQueue(object):
] ]
try: try:
self.pending_transactions[destination] = 1
limiter = yield get_retry_limiter( limiter = yield get_retry_limiter(
destination, destination,
self._clock, self._clock,
@ -239,8 +241,6 @@ class TransactionQueue(object):
len(pending_failures) len(pending_failures)
) )
self.pending_transactions[destination] = 1
logger.debug("TX [%s] Persisting transaction...", destination) logger.debug("TX [%s] Persisting transaction...", destination)
transaction = Transaction.create_new( transaction = Transaction.create_new(
@ -287,7 +287,7 @@ class TransactionQueue(object):
code = 200 code = 200
if response: if response:
for e_id, r in getattr(response, "pdus", {}).items(): for e_id, r in response.get("pdus", {}).items():
if "error" in r: if "error" in r:
logger.warn( logger.warn(
"Transaction returned error for %s: %s", "Transaction returned error for %s: %s",

View file

@ -219,3 +219,22 @@ class TransportLayerClient(object):
) )
defer.returnValue(content) defer.returnValue(content)
@defer.inlineCallbacks
@log_function
def get_missing_events(self, destination, room_id, earliest_events,
latest_events, limit, min_depth):
path = PREFIX + "/get_missing_events/%s" % (room_id,)
content = yield self.client.post_json(
destination=destination,
path=path,
data={
"limit": int(limit),
"min_depth": int(min_depth),
"earliest_events": earliest_events,
"latest_events": latest_events,
}
)
defer.returnValue(content)

View file

@ -242,6 +242,7 @@ class TransportLayerServer(object):
) )
) )
) )
self.server.register_path( self.server.register_path(
"POST", "POST",
re.compile("^" + PREFIX + "/query_auth/([^/]*)/([^/]*)$"), re.compile("^" + PREFIX + "/query_auth/([^/]*)/([^/]*)$"),
@ -253,6 +254,17 @@ class TransportLayerServer(object):
) )
) )
self.server.register_path(
"POST",
re.compile("^" + PREFIX + "/get_missing_events/([^/]*)/?$"),
self._with_authentication(
lambda origin, content, query, room_id:
self._get_missing_events(
origin, content, room_id,
)
)
)
@defer.inlineCallbacks @defer.inlineCallbacks
@log_function @log_function
def _on_send_request(self, origin, content, query, transaction_id): def _on_send_request(self, origin, content, query, transaction_id):
@ -352,3 +364,22 @@ class TransportLayerServer(object):
) )
defer.returnValue((200, new_content)) defer.returnValue((200, new_content))
@defer.inlineCallbacks
@log_function
def _get_missing_events(self, origin, content, room_id):
limit = int(content.get("limit", 10))
min_depth = int(content.get("min_depth", 0))
earliest_events = content.get("earliest_events", [])
latest_events = content.get("latest_events", [])
content = yield self.request_handler.on_get_missing_events(
origin,
room_id=room_id,
earliest_events=earliest_events,
latest_events=latest_events,
min_depth=min_depth,
limit=limit,
)
defer.returnValue((200, content))

View file

@ -581,12 +581,13 @@ class FederationHandler(BaseHandler):
defer.returnValue(event) defer.returnValue(event)
@defer.inlineCallbacks @defer.inlineCallbacks
def get_state_for_pdu(self, origin, room_id, event_id): def get_state_for_pdu(self, origin, room_id, event_id, do_auth=True):
yield run_on_reactor() yield run_on_reactor()
in_room = yield self.auth.check_host_in_room(room_id, origin) if do_auth:
if not in_room: in_room = yield self.auth.check_host_in_room(room_id, origin)
raise AuthError(403, "Host not in room.") if not in_room:
raise AuthError(403, "Host not in room.")
state_groups = yield self.store.get_state_groups( state_groups = yield self.store.get_state_groups(
[event_id] [event_id]
@ -788,6 +789,29 @@ class FederationHandler(BaseHandler):
defer.returnValue(ret) defer.returnValue(ret)
@defer.inlineCallbacks
def on_get_missing_events(self, origin, room_id, earliest_events,
latest_events, limit, min_depth):
in_room = yield self.auth.check_host_in_room(
room_id,
origin
)
if not in_room:
raise AuthError(403, "Host not in room.")
limit = min(limit, 20)
min_depth = max(min_depth, 0)
missing_events = yield self.store.get_missing_events(
room_id=room_id,
earliest_events=earliest_events,
latest_events=latest_events,
limit=limit,
min_depth=min_depth,
)
defer.returnValue(missing_events)
@defer.inlineCallbacks @defer.inlineCallbacks
@log_function @log_function
def do_auth(self, origin, event, context, auth_events): def do_auth(self, origin, event, context, auth_events):

View file

@ -64,6 +64,9 @@ class EventFederationStore(SQLBaseStore):
for f in front: for f in front:
txn.execute(base_sql, (f,)) txn.execute(base_sql, (f,))
new_front.update([r[0] for r in txn.fetchall()]) new_front.update([r[0] for r in txn.fetchall()])
new_front -= results
front = new_front front = new_front
results.update(front) results.update(front)
@ -378,3 +381,51 @@ class EventFederationStore(SQLBaseStore):
event_results += new_front event_results += new_front
return self._get_events_txn(txn, event_results) return self._get_events_txn(txn, event_results)
def get_missing_events(self, room_id, earliest_events, latest_events,
limit, min_depth):
return self.runInteraction(
"get_missing_events",
self._get_missing_events,
room_id, earliest_events, latest_events, limit, min_depth
)
def _get_missing_events(self, txn, room_id, earliest_events, latest_events,
limit, min_depth):
earliest_events = set(earliest_events)
front = set(latest_events) - earliest_events
event_results = set()
query = (
"SELECT prev_event_id FROM event_edges "
"WHERE room_id = ? AND event_id = ? AND is_state = 0 "
"LIMIT ?"
)
while front and len(event_results) < limit:
new_front = set()
for event_id in front:
txn.execute(
query,
(room_id, event_id, limit - len(event_results))
)
for e_id, in txn.fetchall():
new_front.add(e_id)
new_front -= earliest_events
new_front -= event_results
front = new_front
event_results |= new_front
events = self._get_events_txn(txn, event_results)
events = sorted(
[ev for ev in events if ev.depth >= min_depth],
key=lambda e: e.depth,
)
return events[:limit]

View file

@ -389,14 +389,18 @@ class PresenceInvitesTestCase(PresenceTestCase):
@defer.inlineCallbacks @defer.inlineCallbacks
def test_invite_remote(self): def test_invite_remote(self):
# Use a different destination, otherwise retry logic might fail the
# request
u_rocket = UserID.from_string("@rocket:there")
put_json = self.mock_http_client.put_json put_json = self.mock_http_client.put_json
put_json.expect_call_and_return( put_json.expect_call_and_return(
call("elsewhere", call("there",
path="/_matrix/federation/v1/send/1000000/", path="/_matrix/federation/v1/send/1000000/",
data=_expect_edu("elsewhere", "m.presence_invite", data=_expect_edu("there", "m.presence_invite",
content={ content={
"observer_user": "@apple:test", "observer_user": "@apple:test",
"observed_user": "@cabbage:elsewhere", "observed_user": "@rocket:there",
} }
), ),
json_data_callback=ANY, json_data_callback=ANY,
@ -405,10 +409,10 @@ class PresenceInvitesTestCase(PresenceTestCase):
) )
yield self.handler.send_invite( yield self.handler.send_invite(
observer_user=self.u_apple, observed_user=self.u_cabbage) observer_user=self.u_apple, observed_user=u_rocket)
self.assertEquals( self.assertEquals(
[{"observed_user_id": "@cabbage:elsewhere", "accepted": 0}], [{"observed_user_id": "@rocket:there", "accepted": 0}],
(yield self.datastore.get_presence_list(self.u_apple.localpart)) (yield self.datastore.get_presence_list(self.u_apple.localpart))
) )
@ -418,13 +422,18 @@ class PresenceInvitesTestCase(PresenceTestCase):
def test_accept_remote(self): def test_accept_remote(self):
# TODO(paul): This test will likely break if/when real auth permissions # TODO(paul): This test will likely break if/when real auth permissions
# are added; for now the HS will always accept any invite # are added; for now the HS will always accept any invite
# Use a different destination, otherwise retry logic might fail the
# request
u_rocket = UserID.from_string("@rocket:moon")
put_json = self.mock_http_client.put_json put_json = self.mock_http_client.put_json
put_json.expect_call_and_return( put_json.expect_call_and_return(
call("elsewhere", call("moon",
path="/_matrix/federation/v1/send/1000000/", path="/_matrix/federation/v1/send/1000000/",
data=_expect_edu("elsewhere", "m.presence_accept", data=_expect_edu("moon", "m.presence_accept",
content={ content={
"observer_user": "@cabbage:elsewhere", "observer_user": "@rocket:moon",
"observed_user": "@apple:test", "observed_user": "@apple:test",
} }
), ),
@ -437,7 +446,7 @@ class PresenceInvitesTestCase(PresenceTestCase):
"/_matrix/federation/v1/send/1000000/", "/_matrix/federation/v1/send/1000000/",
_make_edu_json("elsewhere", "m.presence_invite", _make_edu_json("elsewhere", "m.presence_invite",
content={ content={
"observer_user": "@cabbage:elsewhere", "observer_user": "@rocket:moon",
"observed_user": "@apple:test", "observed_user": "@apple:test",
} }
) )
@ -446,7 +455,7 @@ class PresenceInvitesTestCase(PresenceTestCase):
self.assertTrue( self.assertTrue(
(yield self.datastore.is_presence_visible( (yield self.datastore.is_presence_visible(
observed_localpart=self.u_apple.localpart, observed_localpart=self.u_apple.localpart,
observer_userid=self.u_cabbage.to_string(), observer_userid=u_rocket.to_string(),
)) ))
) )
@ -454,13 +463,17 @@ class PresenceInvitesTestCase(PresenceTestCase):
@defer.inlineCallbacks @defer.inlineCallbacks
def test_invited_remote_nonexistant(self): def test_invited_remote_nonexistant(self):
# Use a different destination, otherwise retry logic might fail the
# request
u_rocket = UserID.from_string("@rocket:sun")
put_json = self.mock_http_client.put_json put_json = self.mock_http_client.put_json
put_json.expect_call_and_return( put_json.expect_call_and_return(
call("elsewhere", call("sun",
path="/_matrix/federation/v1/send/1000000/", path="/_matrix/federation/v1/send/1000000/",
data=_expect_edu("elsewhere", "m.presence_deny", data=_expect_edu("sun", "m.presence_deny",
content={ content={
"observer_user": "@cabbage:elsewhere", "observer_user": "@rocket:sun",
"observed_user": "@durian:test", "observed_user": "@durian:test",
} }
), ),
@ -471,9 +484,9 @@ class PresenceInvitesTestCase(PresenceTestCase):
yield self.mock_federation_resource.trigger("PUT", yield self.mock_federation_resource.trigger("PUT",
"/_matrix/federation/v1/send/1000000/", "/_matrix/federation/v1/send/1000000/",
_make_edu_json("elsewhere", "m.presence_invite", _make_edu_json("sun", "m.presence_invite",
content={ content={
"observer_user": "@cabbage:elsewhere", "observer_user": "@rocket:sun",
"observed_user": "@durian:test", "observed_user": "@durian:test",
} }
) )