Clean up TransactionQueue

This commit is contained in:
Erik Johnston 2016-08-10 14:21:10 +01:00
parent 5aeadb7414
commit ca8abfbf30
4 changed files with 165 additions and 224 deletions

View file

@ -21,8 +21,7 @@ from .units import Transaction
from synapse.api.errors import HttpResponseException from synapse.api.errors import HttpResponseException
from synapse.util.async import run_on_reactor from synapse.util.async import run_on_reactor
from synapse.util.logutils import log_function from synapse.util.logcontext import preserve_context_over_fn
from synapse.util.logcontext import PreserveLoggingContext
from synapse.util.retryutils import ( from synapse.util.retryutils import (
get_retry_limiter, NotRetryingDestination, get_retry_limiter, NotRetryingDestination,
) )
@ -120,267 +119,213 @@ class TransactionQueue(object):
if not destinations: if not destinations:
return return
deferreds = []
for destination in destinations: for destination in destinations:
deferred = defer.Deferred()
self.pending_pdus_by_dest.setdefault(destination, []).append( self.pending_pdus_by_dest.setdefault(destination, []).append(
(pdu, deferred, order) (pdu, order)
) )
def chain(failure): preserve_context_over_fn(
if not deferred.called: self._attempt_new_transaction, destination
deferred.errback(failure) )
def log_failure(f):
logger.warn("Failed to send pdu to %s: %s", destination, f.value)
deferred.addErrback(log_failure)
with PreserveLoggingContext():
self._attempt_new_transaction(destination).addErrback(chain)
deferreds.append(deferred)
# NO inlineCallbacks
def enqueue_edu(self, edu): def enqueue_edu(self, edu):
destination = edu.destination destination = edu.destination
if not self.can_send_to(destination): if not self.can_send_to(destination):
return return
deferred = defer.Deferred() self.pending_edus_by_dest.setdefault(destination, []).append(edu)
self.pending_edus_by_dest.setdefault(destination, []).append(
(edu, deferred) preserve_context_over_fn(
self._attempt_new_transaction, destination
) )
def chain(failure):
if not deferred.called:
deferred.errback(failure)
def log_failure(f):
logger.warn("Failed to send edu to %s: %s", destination, f.value)
deferred.addErrback(log_failure)
with PreserveLoggingContext():
self._attempt_new_transaction(destination).addErrback(chain)
return deferred
@defer.inlineCallbacks
def enqueue_failure(self, failure, destination): def enqueue_failure(self, failure, destination):
if destination == self.server_name or destination == "localhost": if destination == self.server_name or destination == "localhost":
return return
deferred = defer.Deferred()
if not self.can_send_to(destination): if not self.can_send_to(destination):
return return
self.pending_failures_by_dest.setdefault( self.pending_failures_by_dest.setdefault(
destination, [] destination, []
).append( ).append(failure)
(failure, deferred)
preserve_context_over_fn(
self._attempt_new_transaction, destination
) )
def chain(f):
if not deferred.called:
deferred.errback(f)
def log_failure(f):
logger.warn("Failed to send failure to %s: %s", destination, f.value)
deferred.addErrback(log_failure)
with PreserveLoggingContext():
self._attempt_new_transaction(destination).addErrback(chain)
yield deferred
@measure_func("attempt_new_transaction")
@defer.inlineCallbacks @defer.inlineCallbacks
@log_function
def _attempt_new_transaction(self, destination): def _attempt_new_transaction(self, destination):
yield run_on_reactor() yield run_on_reactor()
while True:
# list of (pending_pdu, deferred, order)
if destination in self.pending_transactions:
# XXX: pending_transactions can get stuck on by a never-ending
# request at which point pending_pdus_by_dest just keeps growing.
# we need application-layer timeouts of some flavour of these
# requests
logger.debug(
"TX [%s] Transaction already in progress",
destination
)
return
# list of (pending_pdu, deferred, order) pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
if destination in self.pending_transactions: pending_edus = self.pending_edus_by_dest.pop(destination, [])
# XXX: pending_transactions can get stuck on by a never-ending pending_failures = self.pending_failures_by_dest.pop(destination, [])
# request at which point pending_pdus_by_dest just keeps growing.
# we need application-layer timeouts of some flavour of these if pending_pdus:
# requests logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d",
logger.debug( destination, len(pending_pdus))
"TX [%s] Transaction already in progress",
destination if not pending_pdus and not pending_edus and not pending_failures:
logger.debug("TX [%s] Nothing to send", destination)
return
yield self._send_new_transaction(
destination, pending_pdus, pending_edus, pending_failures
) )
return
pending_pdus = self.pending_pdus_by_dest.pop(destination, []) @measure_func("_send_new_transaction")
pending_edus = self.pending_edus_by_dest.pop(destination, []) @defer.inlineCallbacks
pending_failures = self.pending_failures_by_dest.pop(destination, []) def _send_new_transaction(self, destination, pending_pdus, pending_edus,
pending_failures):
if pending_pdus:
logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d",
destination, len(pending_pdus))
if not pending_pdus and not pending_edus and not pending_failures:
logger.debug("TX [%s] Nothing to send", destination)
return
try:
self.pending_transactions[destination] = 1
logger.debug("TX [%s] _attempt_new_transaction", destination)
# Sort based on the order field # Sort based on the order field
pending_pdus.sort(key=lambda t: t[2]) pending_pdus.sort(key=lambda t: t[1])
pdus = [x[0] for x in pending_pdus] pdus = [x[0] for x in pending_pdus]
edus = [x[0] for x in pending_edus] edus = pending_edus
failures = [x[0].get_dict() for x in pending_failures] failures = [x.get_dict() for x in pending_failures]
deferreds = [
x[1]
for x in pending_pdus + pending_edus + pending_failures
]
txn_id = str(self._next_txn_id) try:
self.pending_transactions[destination] = 1
limiter = yield get_retry_limiter( logger.debug("TX [%s] _attempt_new_transaction", destination)
destination,
self.clock,
self.store,
)
logger.debug( txn_id = str(self._next_txn_id)
"TX [%s] {%s} Attempting new transaction"
" (pdus: %d, edus: %d, failures: %d)",
destination, txn_id,
len(pending_pdus),
len(pending_edus),
len(pending_failures)
)
logger.debug("TX [%s] Persisting transaction...", destination) limiter = yield get_retry_limiter(
destination,
transaction = Transaction.create_new( self.clock,
origin_server_ts=int(self.clock.time_msec()), self.store,
transaction_id=txn_id,
origin=self.server_name,
destination=destination,
pdus=pdus,
edus=edus,
pdu_failures=failures,
)
self._next_txn_id += 1
yield self.transaction_actions.prepare_to_send(transaction)
logger.debug("TX [%s] Persisted transaction", destination)
logger.info(
"TX [%s] {%s} Sending transaction [%s],"
" (PDUs: %d, EDUs: %d, failures: %d)",
destination, txn_id,
transaction.transaction_id,
len(pending_pdus),
len(pending_edus),
len(pending_failures),
)
with limiter:
# Actually send the transaction
# FIXME (erikj): This is a bit of a hack to make the Pdu age
# keys work
def json_data_cb():
data = transaction.get_dict()
now = int(self.clock.time_msec())
if "pdus" in data:
for p in data["pdus"]:
if "age_ts" in p:
unsigned = p.setdefault("unsigned", {})
unsigned["age"] = now - int(p["age_ts"])
del p["age_ts"]
return data
try:
response = yield self.transport_layer.send_transaction(
transaction, json_data_cb
)
code = 200
if response:
for e_id, r in response.get("pdus", {}).items():
if "error" in r:
logger.warn(
"Transaction returned error for %s: %s",
e_id, r,
)
except HttpResponseException as e:
code = e.code
response = e.response
logger.info(
"TX [%s] {%s} got %d response",
destination, txn_id, code
) )
logger.debug("TX [%s] Sent transaction", destination) logger.debug(
logger.debug("TX [%s] Marking as delivered...", destination) "TX [%s] {%s} Attempting new transaction"
" (pdus: %d, edus: %d, failures: %d)",
destination, txn_id,
len(pending_pdus),
len(pending_edus),
len(pending_failures)
)
yield self.transaction_actions.delivered( logger.debug("TX [%s] Persisting transaction...", destination)
transaction, code, response
)
logger.debug("TX [%s] Marked as delivered", destination) transaction = Transaction.create_new(
origin_server_ts=int(self.clock.time_msec()),
transaction_id=txn_id,
origin=self.server_name,
destination=destination,
pdus=pdus,
edus=edus,
pdu_failures=failures,
)
logger.debug("TX [%s] Yielding to callbacks...", destination) self._next_txn_id += 1
for deferred in deferreds: yield self.transaction_actions.prepare_to_send(transaction)
if code == 200:
deferred.callback(None)
else:
deferred.errback(RuntimeError("Got status %d" % code))
# Ensures we don't continue until all callbacks on that logger.debug("TX [%s] Persisted transaction", destination)
# deferred have fired logger.info(
try: "TX [%s] {%s} Sending transaction [%s],"
yield deferred " (PDUs: %d, EDUs: %d, failures: %d)",
except: destination, txn_id,
pass transaction.transaction_id,
len(pending_pdus),
len(pending_edus),
len(pending_failures),
)
logger.debug("TX [%s] Yielded to callbacks", destination) with limiter:
except NotRetryingDestination: # Actually send the transaction
logger.info(
"TX [%s] not ready for retry yet - "
"dropping transaction for now",
destination,
)
except RuntimeError as e:
# We capture this here as there as nothing actually listens
# for this finishing functions deferred.
logger.warn(
"TX [%s] Problem in _attempt_transaction: %s",
destination,
e,
)
except Exception as e:
# We capture this here as there as nothing actually listens
# for this finishing functions deferred.
logger.warn(
"TX [%s] Problem in _attempt_transaction: %s",
destination,
e,
)
for deferred in deferreds: # FIXME (erikj): This is a bit of a hack to make the Pdu age
if not deferred.called: # keys work
deferred.errback(e) def json_data_cb():
data = transaction.get_dict()
now = int(self.clock.time_msec())
if "pdus" in data:
for p in data["pdus"]:
if "age_ts" in p:
unsigned = p.setdefault("unsigned", {})
unsigned["age"] = now - int(p["age_ts"])
del p["age_ts"]
return data
finally: try:
# We want to be *very* sure we delete this after we stop processing response = yield self.transport_layer.send_transaction(
self.pending_transactions.pop(destination, None) transaction, json_data_cb
)
code = 200
# Check to see if there is anything else to send. if response:
self._attempt_new_transaction(destination) for e_id, r in response.get("pdus", {}).items():
if "error" in r:
logger.warn(
"Transaction returned error for %s: %s",
e_id, r,
)
except HttpResponseException as e:
code = e.code
response = e.response
logger.info(
"TX [%s] {%s} got %d response",
destination, txn_id, code
)
logger.debug("TX [%s] Sent transaction", destination)
logger.debug("TX [%s] Marking as delivered...", destination)
yield self.transaction_actions.delivered(
transaction, code, response
)
logger.debug("TX [%s] Marked as delivered", destination)
if code != 200:
for p in pdus:
logger.info("Failed to send event %s to %s", p.event_id, destination)
except NotRetryingDestination:
logger.info(
"TX [%s] not ready for retry yet - "
"dropping transaction for now",
destination,
)
except RuntimeError as e:
# We capture this here as there as nothing actually listens
# for this finishing functions deferred.
logger.warn(
"TX [%s] Problem in _attempt_transaction: %s",
destination,
e,
)
for p in pdus:
logger.info("Failed to send event %s to %s", p.event_id, destination)
except Exception as e:
# We capture this here as there as nothing actually listens
# for this finishing functions deferred.
logger.warn(
"TX [%s] Problem in _attempt_transaction: %s",
destination,
e,
)
for p in pdus:
logger.info("Failed to send event %s to %s", p.event_id, destination)
finally:
# We want to be *very* sure we delete this after we stop processing
self.pending_transactions.pop(destination, None)

View file

@ -155,9 +155,7 @@ class MatrixFederationHttpClient(object):
time_out=timeout / 1000. if timeout else 60, time_out=timeout / 1000. if timeout else 60,
) )
response = yield preserve_context_over_fn( response = yield preserve_context_over_fn(send_request)
send_request,
)
log_result = "%d %s" % (response.code, response.phrase,) log_result = "%d %s" % (response.code, response.phrase,)
break break

View file

@ -317,7 +317,6 @@ def preserve_fn(f):
def g(*args, **kwargs): def g(*args, **kwargs):
with PreserveLoggingContext(current): with PreserveLoggingContext(current):
return f(*args, **kwargs) return f(*args, **kwargs)
return g return g

View file

@ -78,7 +78,6 @@ class Measure(object):
self.start = self.clock.time_msec() self.start = self.clock.time_msec()
self.start_context = LoggingContext.current_context() self.start_context = LoggingContext.current_context()
if not self.start_context: if not self.start_context:
logger.warn("Entered Measure without log context: %s", self.name)
self.start_context = LoggingContext("Measure") self.start_context = LoggingContext("Measure")
self.start_context.__enter__() self.start_context.__enter__()
self.created_context = True self.created_context = True
@ -99,7 +98,7 @@ class Measure(object):
if context != self.start_context: if context != self.start_context:
logger.warn( logger.warn(
"Context has unexpectedly changed from '%s' to '%s'. (%r)", "Context has unexpectedly changed from '%s' to '%s'. (%r)",
context, self.start_context, self.name self.start_context, context, self.name
) )
return return