Fix threading

This commit is contained in:
Erik Johnston 2018-11-20 17:04:19 +00:00
parent 607ac7ea37
commit 115e4bb4c6
3 changed files with 29 additions and 11 deletions

View file

@ -14,6 +14,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import logging import logging
import random
import six import six
from six import iteritems from six import iteritems
@ -71,6 +72,7 @@ class FederationServer(FederationBase):
self.auth = hs.get_auth() self.auth = hs.get_auth()
self.handler = hs.get_handlers().federation_handler self.handler = hs.get_handlers().federation_handler
self.clock = hs.get_clock()
self._server_linearizer = Linearizer("fed_server") self._server_linearizer = Linearizer("fed_server")
self._transaction_linearizer = Linearizer("fed_txn_handler") self._transaction_linearizer = Linearizer("fed_txn_handler")
@ -208,12 +210,25 @@ class FederationServer(FederationBase):
pdu_results[event_id] = e.error_dict() pdu_results[event_id] = e.error_dict()
return return
thread_id = random.randint(0, 999999999)
pdu_to_thread = {}
first_in_thread = True
for pdu in reversed(pdus_by_room[room_id]):
now = self.clock.time_msec()
if now - pdu.origin_server_ts > 1 * 60 * 1000:
pdu_to_thread[pdu.event_id] = (thread_id, first_in_thread)
first_in_thread = False
else:
pdu_to_thread[pdu.event_id] = (0, False)
for pdu in pdus_by_room[room_id]: for pdu in pdus_by_room[room_id]:
event_id = pdu.event_id event_id = pdu.event_id
with nested_logging_context(event_id): with nested_logging_context(event_id):
thread_id, new_thread = pdu_to_thread[pdu.event_id]
try: try:
yield self._handle_received_pdu( yield self._handle_received_pdu(
origin, pdu origin, pdu, thread_id=thread_id,
new_thread=new_thread
) )
pdu_results[event_id] = {} pdu_results[event_id] = {}
except FederationError as e: except FederationError as e:
@ -571,7 +586,7 @@ class FederationServer(FederationBase):
) )
@defer.inlineCallbacks @defer.inlineCallbacks
def _handle_received_pdu(self, origin, pdu): def _handle_received_pdu(self, origin, pdu, thread_id, new_thread):
""" Process a PDU received in a federation /send/ transaction. """ Process a PDU received in a federation /send/ transaction.
If the event is invalid, then this method throws a FederationError. If the event is invalid, then this method throws a FederationError.
@ -638,6 +653,7 @@ class FederationServer(FederationBase):
yield self.handler.on_receive_pdu( yield self.handler.on_receive_pdu(
origin, pdu, sent_to_us_directly=True, origin, pdu, sent_to_us_directly=True,
thread_id=thread_id, new_thread=new_thread,
) )
def __str__(self): def __str__(self):

View file

@ -138,6 +138,7 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks @defer.inlineCallbacks
def on_receive_pdu( def on_receive_pdu(
self, origin, pdu, sent_to_us_directly=False, thread_id=None, self, origin, pdu, sent_to_us_directly=False, thread_id=None,
new_thread=False,
): ):
""" Process a PDU received via a federation /send/ transaction, or """ Process a PDU received via a federation /send/ transaction, or
via backfill of missing prev_events via backfill of missing prev_events
@ -461,7 +462,7 @@ class FederationHandler(BaseHandler):
create_requester(UserID("server", "server")), create_requester(UserID("server", "server")),
event, event,
context, context,
ratelimit=True, ratelimit=False,
extra_users=[], extra_users=[],
do_auth=False, do_auth=False,
) )

View file

@ -253,13 +253,13 @@ class MatrixFederationHttpClient(object):
): ):
raise FederationDeniedError(request.destination) raise FederationDeniedError(request.destination)
limiter = yield synapse.util.retryutils.get_retry_limiter( # limiter = yield synapse.util.retryutils.get_retry_limiter(
request.destination, # request.destination,
self.clock, # self.clock,
self._store, # self._store,
backoff_on_404=backoff_on_404, # backoff_on_404=backoff_on_404,
ignore_backoff=ignore_backoff, # ignore_backoff=ignore_backoff,
) # )
method_bytes = request.method.encode("ascii") method_bytes = request.method.encode("ascii")
destination_bytes = request.destination.encode("ascii") destination_bytes = request.destination.encode("ascii")
@ -274,7 +274,8 @@ class MatrixFederationHttpClient(object):
b"Host": [destination_bytes], b"Host": [destination_bytes],
} }
with limiter: # with limiter:
if True:
# XXX: Would be much nicer to retry only at the transaction-layer # XXX: Would be much nicer to retry only at the transaction-layer
# (once we have reliable transactions in place) # (once we have reliable transactions in place)
if long_retries: if long_retries: