Only relay 'live' events

This commit is contained in:
Erik Johnston 2018-11-27 17:30:11 +00:00 committed by Brendan Abolivier
parent e2230b28fb
commit 2d8da62feb
4 changed files with 43 additions and 12 deletions

View file

@ -79,16 +79,16 @@ class FederationBase(object):
allow_none=True, allow_none=True,
) )
if not res and pdu.origin != origin: # if not res and pdu.origin != origin:
try: # try:
res = yield self.get_pdu( # res = yield self.get_pdu(
destinations=[pdu.origin], # destinations=[pdu.origin],
event_id=pdu.event_id, # event_id=pdu.event_id,
outlier=outlier, # outlier=outlier,
timeout=10000, # timeout=10000,
) # )
except SynapseError: # except SynapseError:
pass # pass
if not res: if not res:
logger.warn( logger.warn(

View file

@ -177,7 +177,7 @@ class TransactionQueue(object):
@defer.inlineCallbacks @defer.inlineCallbacks
def handle_event(event): def handle_event(event):
should_relay = yield self._should_relay(event) should_relay = yield self._should_relay(event, False)
logger.info("Should relay event %s: %s", event.event_id, should_relay) logger.info("Should relay event %s: %s", event.event_id, should_relay)
if not should_relay: if not should_relay:
return return
@ -249,6 +249,20 @@ class TransactionQueue(object):
finally: finally:
self._is_processing = False self._is_processing = False
@defer.inlineCallbacks
def received_new_event(self, origin, event):
should_relay = yield self._should_relay(event, True)
logger.info("Should relay event %s: %s", event.event_id, should_relay)
if not should_relay:
return
destinations = event.unsigned.get("destinations")
destinations = set(destinations)
logger.debug("Sending %s to %r", event, destinations)
yield self._send_pdu(event, destinations)
@defer.inlineCallbacks @defer.inlineCallbacks
def _send_pdu(self, pdu, destinations, span=None): def _send_pdu(self, pdu, destinations, span=None):
# We loop through all destinations to see whether we already have # We loop through all destinations to see whether we already have
@ -352,12 +366,18 @@ class TransactionQueue(object):
return joined_hosts return joined_hosts
def _should_relay(self, event): def _should_relay(self, event, from_federation):
"""Whether we should consider relaying this event. """Whether we should consider relaying this event.
""" """
# XXX: Hook for routing shenanigans # XXX: Hook for routing shenanigans
<<<<<<< HEAD
=======
if from_federation and event.unsigned.get("destinations"):
return True
>>>>>>> efdec3252... Only relay 'live' events
send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of() send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of()
is_mine = self.is_mine_id(event.event_id) is_mine = self.is_mine_id(event.event_id)
if not is_mine and send_on_behalf_of is None: if not is_mine and send_on_behalf_of is None:

View file

@ -48,6 +48,7 @@ from synapse.crypto.event_signing import (
add_hashes_and_signatures, add_hashes_and_signatures,
compute_event_signature, compute_event_signature,
) )
from synapse.events import FrozenEvent
from synapse.events.validator import EventValidator from synapse.events.validator import EventValidator
from synapse.replication.http.federation import ( from synapse.replication.http.federation import (
ReplicationCleanRoomRestServlet, ReplicationCleanRoomRestServlet,
@ -111,6 +112,7 @@ class FederationHandler(BaseHandler):
self.store = hs.get_datastore() # type: synapse.storage.DataStore self.store = hs.get_datastore() # type: synapse.storage.DataStore
self.federation_client = hs.get_federation_client() self.federation_client = hs.get_federation_client()
self.federation_sender = hs.get_federation_sender()
self.state_handler = hs.get_state_handler() self.state_handler = hs.get_state_handler()
self.server_name = hs.hostname self.server_name = hs.hostname
self.keyring = hs.get_keyring() self.keyring = hs.get_keyring()
@ -435,6 +437,10 @@ class FederationHandler(BaseHandler):
logger.info("Thread ID %r", thread_id) logger.info("Thread ID %r", thread_id)
# Remove destinations field before persisting
event_copy = FrozenEvent.from_event(pdu)
pdu.unsigned.pop("destinations", None)
yield self._process_received_pdu( yield self._process_received_pdu(
origin, origin,
pdu, pdu,
@ -443,6 +449,9 @@ class FederationHandler(BaseHandler):
thread_id=thread_id, thread_id=thread_id,
) )
if sent_to_us_directly:
yield self.federation_sender.received_new_event(origin, event_copy)
if new_thread: if new_thread:
builder = self.event_builder_factory.new({ builder = self.event_builder_factory.new({
"type": "org.matrix.new_thread", "type": "org.matrix.new_thread",

View file

@ -590,6 +590,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
prev_event_id IN (%s) prev_event_id IN (%s)
AND NOT events.outlier AND NOT events.outlier
AND rejections.event_id IS NULL AND rejections.event_id IS NULL
AND NOT events.internal_event
""" % ( """ % (
",".join("?" for _ in batch), ",".join("?" for _ in batch),
) )
@ -1290,6 +1291,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
and isinstance(event.content["url"], text_type) and isinstance(event.content["url"], text_type)
), ),
"thread_id": ctx.thread_id, "thread_id": ctx.thread_id,
"internal_event": event.internal_metadata.is_internal_event(),
} }
for event, ctx in events_and_contexts for event, ctx in events_and_contexts
], ],