mirror of
https://github.com/element-hq/synapse.git
synced 2024-11-23 01:55:53 +03:00
Merge branch 'develop' of github.com:matrix-org/synapse into federation_rate_limit
This commit is contained in:
commit
4195e55ccc
10 changed files with 284 additions and 59 deletions
|
@ -4,6 +4,8 @@ Changes in synapse vx.x.x (x-x-x)
|
||||||
* Add support for registration fallback. This is a page hosted on the server
|
* Add support for registration fallback. This is a page hosted on the server
|
||||||
which allows a user to register for an account, regardless of what client
|
which allows a user to register for an account, regardless of what client
|
||||||
they are using (e.g. mobile devices).
|
they are using (e.g. mobile devices).
|
||||||
|
* Application services can now poll on the CS API ``/events`` for their events,
|
||||||
|
by providing their application service ``access_token``.
|
||||||
|
|
||||||
Changes in synapse v0.7.1 (2015-02-19)
|
Changes in synapse v0.7.1 (2015-02-19)
|
||||||
======================================
|
======================================
|
||||||
|
|
|
@ -112,17 +112,19 @@ class FederationServer(FederationBase):
|
||||||
logger.debug("[%s] Transaction is new", transaction.transaction_id)
|
logger.debug("[%s] Transaction is new", transaction.transaction_id)
|
||||||
|
|
||||||
with PreserveLoggingContext():
|
with PreserveLoggingContext():
|
||||||
dl = []
|
results = []
|
||||||
|
|
||||||
for pdu in pdu_list:
|
for pdu in pdu_list:
|
||||||
d = self._handle_new_pdu(transaction.origin, pdu)
|
d = self._handle_new_pdu(transaction.origin, pdu)
|
||||||
|
|
||||||
def handle_failure(failure):
|
try:
|
||||||
failure.trap(FederationError)
|
yield d
|
||||||
self.send_failure(failure.value, transaction.origin)
|
results.append({})
|
||||||
|
except FederationError as e:
|
||||||
d.addErrback(handle_failure)
|
self.send_failure(e, transaction.origin)
|
||||||
|
results.append({"error": str(e)})
|
||||||
dl.append(d)
|
except Exception as e:
|
||||||
|
results.append({"error": str(e)})
|
||||||
|
|
||||||
if hasattr(transaction, "edus"):
|
if hasattr(transaction, "edus"):
|
||||||
for edu in [Edu(**x) for x in transaction.edus]:
|
for edu in [Edu(**x) for x in transaction.edus]:
|
||||||
|
@ -135,21 +137,11 @@ class FederationServer(FederationBase):
|
||||||
for failure in getattr(transaction, "pdu_failures", []):
|
for failure in getattr(transaction, "pdu_failures", []):
|
||||||
logger.info("Got failure %r", failure)
|
logger.info("Got failure %r", failure)
|
||||||
|
|
||||||
results = yield defer.DeferredList(dl, consumeErrors=True)
|
logger.debug("Returning: %s", str(results))
|
||||||
|
|
||||||
ret = []
|
|
||||||
for r in results:
|
|
||||||
if r[0]:
|
|
||||||
ret.append({})
|
|
||||||
else:
|
|
||||||
logger.exception(r[1])
|
|
||||||
ret.append({"error": str(r[1].value)})
|
|
||||||
|
|
||||||
logger.debug("Returning: %s", str(ret))
|
|
||||||
|
|
||||||
response = {
|
response = {
|
||||||
"pdus": dict(zip(
|
"pdus": dict(zip(
|
||||||
(p.event_id for p in pdu_list), ret
|
(p.event_id for p in pdu_list), results
|
||||||
)),
|
)),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -69,9 +69,6 @@ class EventStreamHandler(BaseHandler):
|
||||||
)
|
)
|
||||||
self._streams_per_user[auth_user] += 1
|
self._streams_per_user[auth_user] += 1
|
||||||
|
|
||||||
if pagin_config.from_token is None:
|
|
||||||
pagin_config.from_token = None
|
|
||||||
|
|
||||||
rm_handler = self.hs.get_handlers().room_member_handler
|
rm_handler = self.hs.get_handlers().room_member_handler
|
||||||
room_ids = yield rm_handler.get_rooms_for_user(auth_user)
|
room_ids = yield rm_handler.get_rooms_for_user(auth_user)
|
||||||
|
|
||||||
|
|
|
@ -510,9 +510,16 @@ class RoomMemberHandler(BaseHandler):
|
||||||
def get_rooms_for_user(self, user, membership_list=[Membership.JOIN]):
|
def get_rooms_for_user(self, user, membership_list=[Membership.JOIN]):
|
||||||
"""Returns a list of roomids that the user has any of the given
|
"""Returns a list of roomids that the user has any of the given
|
||||||
membership states in."""
|
membership states in."""
|
||||||
rooms = yield self.store.get_rooms_for_user_where_membership_is(
|
|
||||||
user_id=user.to_string(), membership_list=membership_list
|
app_service = yield self.store.get_app_service_by_user_id(
|
||||||
|
user.to_string()
|
||||||
)
|
)
|
||||||
|
if app_service:
|
||||||
|
rooms = yield self.store.get_app_service_rooms(app_service)
|
||||||
|
else:
|
||||||
|
rooms = yield self.store.get_rooms_for_user_where_membership_is(
|
||||||
|
user_id=user.to_string(), membership_list=membership_list
|
||||||
|
)
|
||||||
|
|
||||||
# For some reason the list of events contains duplicates
|
# For some reason the list of events contains duplicates
|
||||||
# TODO(paul): work out why because I really don't think it should
|
# TODO(paul): work out why because I really don't think it should
|
||||||
|
@ -559,13 +566,24 @@ class RoomEventSource(object):
|
||||||
|
|
||||||
to_key = yield self.get_current_key()
|
to_key = yield self.get_current_key()
|
||||||
|
|
||||||
events, end_key = yield self.store.get_room_events_stream(
|
app_service = yield self.store.get_app_service_by_user_id(
|
||||||
user_id=user.to_string(),
|
user.to_string()
|
||||||
from_key=from_key,
|
|
||||||
to_key=to_key,
|
|
||||||
room_id=None,
|
|
||||||
limit=limit,
|
|
||||||
)
|
)
|
||||||
|
if app_service:
|
||||||
|
events, end_key = yield self.store.get_appservice_room_stream(
|
||||||
|
service=app_service,
|
||||||
|
from_key=from_key,
|
||||||
|
to_key=to_key,
|
||||||
|
limit=limit,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
events, end_key = yield self.store.get_room_events_stream(
|
||||||
|
user_id=user.to_string(),
|
||||||
|
from_key=from_key,
|
||||||
|
to_key=to_key,
|
||||||
|
room_id=None,
|
||||||
|
limit=limit,
|
||||||
|
)
|
||||||
|
|
||||||
defer.returnValue((events, end_key))
|
defer.returnValue((events, end_key))
|
||||||
|
|
||||||
|
|
|
@ -36,8 +36,10 @@ class _NotificationListener(object):
|
||||||
so that it can remove itself from the indexes in the Notifier class.
|
so that it can remove itself from the indexes in the Notifier class.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, user, rooms, from_token, limit, timeout, deferred):
|
def __init__(self, user, rooms, from_token, limit, timeout, deferred,
|
||||||
|
appservice=None):
|
||||||
self.user = user
|
self.user = user
|
||||||
|
self.appservice = appservice
|
||||||
self.from_token = from_token
|
self.from_token = from_token
|
||||||
self.limit = limit
|
self.limit = limit
|
||||||
self.timeout = timeout
|
self.timeout = timeout
|
||||||
|
@ -65,6 +67,10 @@ class _NotificationListener(object):
|
||||||
lst.discard(self)
|
lst.discard(self)
|
||||||
|
|
||||||
notifier.user_to_listeners.get(self.user, set()).discard(self)
|
notifier.user_to_listeners.get(self.user, set()).discard(self)
|
||||||
|
if self.appservice:
|
||||||
|
notifier.appservice_to_listeners.get(
|
||||||
|
self.appservice, set()
|
||||||
|
).discard(self)
|
||||||
|
|
||||||
|
|
||||||
class Notifier(object):
|
class Notifier(object):
|
||||||
|
@ -79,6 +85,7 @@ class Notifier(object):
|
||||||
|
|
||||||
self.rooms_to_listeners = {}
|
self.rooms_to_listeners = {}
|
||||||
self.user_to_listeners = {}
|
self.user_to_listeners = {}
|
||||||
|
self.appservice_to_listeners = {}
|
||||||
|
|
||||||
self.event_sources = hs.get_event_sources()
|
self.event_sources = hs.get_event_sources()
|
||||||
|
|
||||||
|
@ -114,6 +121,17 @@ class Notifier(object):
|
||||||
for user in extra_users:
|
for user in extra_users:
|
||||||
listeners |= self.user_to_listeners.get(user, set()).copy()
|
listeners |= self.user_to_listeners.get(user, set()).copy()
|
||||||
|
|
||||||
|
for appservice in self.appservice_to_listeners:
|
||||||
|
# TODO (kegan): Redundant appservice listener checks?
|
||||||
|
# App services will already be in the rooms_to_listeners set, but
|
||||||
|
# that isn't enough. They need to be checked here in order to
|
||||||
|
# receive *invites* for users they are interested in. Does this
|
||||||
|
# make the rooms_to_listeners check somewhat obselete?
|
||||||
|
if appservice.is_interested(event):
|
||||||
|
listeners |= self.appservice_to_listeners.get(
|
||||||
|
appservice, set()
|
||||||
|
).copy()
|
||||||
|
|
||||||
logger.debug("on_new_room_event listeners %s", listeners)
|
logger.debug("on_new_room_event listeners %s", listeners)
|
||||||
|
|
||||||
# TODO (erikj): Can we make this more efficient by hitting the
|
# TODO (erikj): Can we make this more efficient by hitting the
|
||||||
|
@ -280,6 +298,10 @@ class Notifier(object):
|
||||||
if not from_token:
|
if not from_token:
|
||||||
from_token = yield self.event_sources.get_current_token()
|
from_token = yield self.event_sources.get_current_token()
|
||||||
|
|
||||||
|
appservice = yield self.hs.get_datastore().get_app_service_by_user_id(
|
||||||
|
user.to_string()
|
||||||
|
)
|
||||||
|
|
||||||
listener = _NotificationListener(
|
listener = _NotificationListener(
|
||||||
user,
|
user,
|
||||||
rooms,
|
rooms,
|
||||||
|
@ -287,6 +309,7 @@ class Notifier(object):
|
||||||
limit,
|
limit,
|
||||||
timeout,
|
timeout,
|
||||||
deferred,
|
deferred,
|
||||||
|
appservice=appservice
|
||||||
)
|
)
|
||||||
|
|
||||||
def _timeout_listener():
|
def _timeout_listener():
|
||||||
|
@ -319,6 +342,11 @@ class Notifier(object):
|
||||||
|
|
||||||
self.user_to_listeners.setdefault(listener.user, set()).add(listener)
|
self.user_to_listeners.setdefault(listener.user, set()).add(listener)
|
||||||
|
|
||||||
|
if listener.appservice:
|
||||||
|
self.appservice_to_listeners.setdefault(
|
||||||
|
listener.appservice, set()
|
||||||
|
).add(listener)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
@log_function
|
@log_function
|
||||||
def _check_for_updates(self, listener):
|
def _check_for_updates(self, listener):
|
||||||
|
|
|
@ -450,7 +450,8 @@ class SQLBaseStore(object):
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
table : string giving the table name
|
table : string giving the table name
|
||||||
keyvalues : dict of column names and values to select the rows with
|
keyvalues : dict of column names and values to select the rows with,
|
||||||
|
or None to not apply a WHERE clause.
|
||||||
retcols : list of strings giving the names of the columns to return
|
retcols : list of strings giving the names of the columns to return
|
||||||
"""
|
"""
|
||||||
return self.runInteraction(
|
return self.runInteraction(
|
||||||
|
@ -469,13 +470,20 @@ class SQLBaseStore(object):
|
||||||
keyvalues : dict of column names and values to select the rows with
|
keyvalues : dict of column names and values to select the rows with
|
||||||
retcols : list of strings giving the names of the columns to return
|
retcols : list of strings giving the names of the columns to return
|
||||||
"""
|
"""
|
||||||
sql = "SELECT %s FROM %s WHERE %s ORDER BY rowid asc" % (
|
if keyvalues:
|
||||||
", ".join(retcols),
|
sql = "SELECT %s FROM %s WHERE %s ORDER BY rowid asc" % (
|
||||||
table,
|
", ".join(retcols),
|
||||||
" AND ".join("%s = ?" % (k, ) for k in keyvalues)
|
table,
|
||||||
)
|
" AND ".join("%s = ?" % (k, ) for k in keyvalues)
|
||||||
|
)
|
||||||
|
txn.execute(sql, keyvalues.values())
|
||||||
|
else:
|
||||||
|
sql = "SELECT %s FROM %s ORDER BY rowid asc" % (
|
||||||
|
", ".join(retcols),
|
||||||
|
table
|
||||||
|
)
|
||||||
|
txn.execute(sql)
|
||||||
|
|
||||||
txn.execute(sql, keyvalues.values())
|
|
||||||
return self.cursor_to_dict(txn)
|
return self.cursor_to_dict(txn)
|
||||||
|
|
||||||
def _simple_update_one(self, table, keyvalues, updatevalues,
|
def _simple_update_one(self, table, keyvalues, updatevalues,
|
||||||
|
|
|
@ -15,8 +15,10 @@
|
||||||
import logging
|
import logging
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
|
|
||||||
|
from synapse.api.constants import Membership
|
||||||
from synapse.api.errors import StoreError
|
from synapse.api.errors import StoreError
|
||||||
from synapse.appservice import ApplicationService
|
from synapse.appservice import ApplicationService
|
||||||
|
from synapse.storage.roommember import RoomsForUser
|
||||||
from ._base import SQLBaseStore
|
from ._base import SQLBaseStore
|
||||||
|
|
||||||
|
|
||||||
|
@ -150,9 +152,32 @@ class ApplicationServiceStore(SQLBaseStore):
|
||||||
yield self.cache_defer # make sure the cache is ready
|
yield self.cache_defer # make sure the cache is ready
|
||||||
defer.returnValue(self.services_cache)
|
defer.returnValue(self.services_cache)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def get_app_service_by_user_id(self, user_id):
|
||||||
|
"""Retrieve an application service from their user ID.
|
||||||
|
|
||||||
|
All application services have associated with them a particular user ID.
|
||||||
|
There is no distinguishing feature on the user ID which indicates it
|
||||||
|
represents an application service. This function allows you to map from
|
||||||
|
a user ID to an application service.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
user_id(str): The user ID to see if it is an application service.
|
||||||
|
Returns:
|
||||||
|
synapse.appservice.ApplicationService or None.
|
||||||
|
"""
|
||||||
|
|
||||||
|
yield self.cache_defer # make sure the cache is ready
|
||||||
|
|
||||||
|
for service in self.services_cache:
|
||||||
|
if service.sender == user_id:
|
||||||
|
defer.returnValue(service)
|
||||||
|
return
|
||||||
|
defer.returnValue(None)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def get_app_service_by_token(self, token, from_cache=True):
|
def get_app_service_by_token(self, token, from_cache=True):
|
||||||
"""Get the application service with the given token.
|
"""Get the application service with the given appservice token.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
token (str): The application service token.
|
token (str): The application service token.
|
||||||
|
@ -173,6 +198,77 @@ class ApplicationServiceStore(SQLBaseStore):
|
||||||
# TODO: The from_cache=False impl
|
# TODO: The from_cache=False impl
|
||||||
# TODO: This should be JOINed with the application_services_regex table.
|
# TODO: This should be JOINed with the application_services_regex table.
|
||||||
|
|
||||||
|
def get_app_service_rooms(self, service):
|
||||||
|
"""Get a list of RoomsForUser for this application service.
|
||||||
|
|
||||||
|
Application services may be "interested" in lots of rooms depending on
|
||||||
|
the room ID, the room aliases, or the members in the room. This function
|
||||||
|
takes all of these into account and returns a list of RoomsForUser which
|
||||||
|
represent the entire list of room IDs that this application service
|
||||||
|
wants to know about.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
service: The application service to get a room list for.
|
||||||
|
Returns:
|
||||||
|
A list of RoomsForUser.
|
||||||
|
"""
|
||||||
|
return self.runInteraction(
|
||||||
|
"get_app_service_rooms",
|
||||||
|
self._get_app_service_rooms_txn,
|
||||||
|
service,
|
||||||
|
)
|
||||||
|
|
||||||
|
def _get_app_service_rooms_txn(self, txn, service):
|
||||||
|
# get all rooms matching the room ID regex.
|
||||||
|
room_entries = self._simple_select_list_txn(
|
||||||
|
txn=txn, table="rooms", keyvalues=None, retcols=["room_id"]
|
||||||
|
)
|
||||||
|
matching_room_list = set([
|
||||||
|
r["room_id"] for r in room_entries if
|
||||||
|
service.is_interested_in_room(r["room_id"])
|
||||||
|
])
|
||||||
|
|
||||||
|
# resolve room IDs for matching room alias regex.
|
||||||
|
room_alias_mappings = self._simple_select_list_txn(
|
||||||
|
txn=txn, table="room_aliases", keyvalues=None,
|
||||||
|
retcols=["room_id", "room_alias"]
|
||||||
|
)
|
||||||
|
matching_room_list |= set([
|
||||||
|
r["room_id"] for r in room_alias_mappings if
|
||||||
|
service.is_interested_in_alias(r["room_alias"])
|
||||||
|
])
|
||||||
|
|
||||||
|
# get all rooms for every user for this AS. This is scoped to users on
|
||||||
|
# this HS only.
|
||||||
|
user_list = self._simple_select_list_txn(
|
||||||
|
txn=txn, table="users", keyvalues=None, retcols=["name"]
|
||||||
|
)
|
||||||
|
user_list = [
|
||||||
|
u["name"] for u in user_list if
|
||||||
|
service.is_interested_in_user(u["name"])
|
||||||
|
]
|
||||||
|
rooms_for_user_matching_user_id = set() # RoomsForUser list
|
||||||
|
for user_id in user_list:
|
||||||
|
# FIXME: This assumes this store is linked with RoomMemberStore :(
|
||||||
|
rooms_for_user = self._get_rooms_for_user_where_membership_is_txn(
|
||||||
|
txn=txn,
|
||||||
|
user_id=user_id,
|
||||||
|
membership_list=[Membership.JOIN]
|
||||||
|
)
|
||||||
|
rooms_for_user_matching_user_id |= set(rooms_for_user)
|
||||||
|
|
||||||
|
# make RoomsForUser tuples for room ids and aliases which are not in the
|
||||||
|
# main rooms_for_user_list - e.g. they are rooms which do not have AS
|
||||||
|
# registered users in it.
|
||||||
|
known_room_ids = [r.room_id for r in rooms_for_user_matching_user_id]
|
||||||
|
missing_rooms_for_user = [
|
||||||
|
RoomsForUser(r, service.sender, "join") for r in
|
||||||
|
matching_room_list if r not in known_room_ids
|
||||||
|
]
|
||||||
|
rooms_for_user_matching_user_id |= set(missing_rooms_for_user)
|
||||||
|
|
||||||
|
return rooms_for_user_matching_user_id
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _populate_cache(self):
|
def _populate_cache(self):
|
||||||
"""Populates the ApplicationServiceCache from the database."""
|
"""Populates the ApplicationServiceCache from the database."""
|
||||||
|
|
|
@ -180,6 +180,14 @@ class RoomMemberStore(SQLBaseStore):
|
||||||
if not membership_list:
|
if not membership_list:
|
||||||
return defer.succeed(None)
|
return defer.succeed(None)
|
||||||
|
|
||||||
|
return self.runInteraction(
|
||||||
|
"get_rooms_for_user_where_membership_is",
|
||||||
|
self._get_rooms_for_user_where_membership_is_txn,
|
||||||
|
user_id, membership_list
|
||||||
|
)
|
||||||
|
|
||||||
|
def _get_rooms_for_user_where_membership_is_txn(self, txn, user_id,
|
||||||
|
membership_list):
|
||||||
where_clause = "user_id = ? AND (%s)" % (
|
where_clause = "user_id = ? AND (%s)" % (
|
||||||
" OR ".join(["membership = ?" for _ in membership_list]),
|
" OR ".join(["membership = ?" for _ in membership_list]),
|
||||||
)
|
)
|
||||||
|
@ -187,24 +195,18 @@ class RoomMemberStore(SQLBaseStore):
|
||||||
args = [user_id]
|
args = [user_id]
|
||||||
args.extend(membership_list)
|
args.extend(membership_list)
|
||||||
|
|
||||||
def f(txn):
|
sql = (
|
||||||
sql = (
|
"SELECT m.room_id, m.sender, m.membership"
|
||||||
"SELECT m.room_id, m.sender, m.membership"
|
" FROM room_memberships as m"
|
||||||
" FROM room_memberships as m"
|
" INNER JOIN current_state_events as c"
|
||||||
" INNER JOIN current_state_events as c"
|
" ON m.event_id = c.event_id"
|
||||||
" ON m.event_id = c.event_id"
|
" WHERE %s"
|
||||||
" WHERE %s"
|
) % (where_clause,)
|
||||||
) % (where_clause,)
|
|
||||||
|
|
||||||
txn.execute(sql, args)
|
txn.execute(sql, args)
|
||||||
return [
|
return [
|
||||||
RoomsForUser(**r) for r in self.cursor_to_dict(txn)
|
RoomsForUser(**r) for r in self.cursor_to_dict(txn)
|
||||||
]
|
]
|
||||||
|
|
||||||
return self.runInteraction(
|
|
||||||
"get_rooms_for_user_where_membership_is",
|
|
||||||
f
|
|
||||||
)
|
|
||||||
|
|
||||||
def get_joined_hosts_for_room(self, room_id):
|
def get_joined_hosts_for_room(self, room_id):
|
||||||
return self._simple_select_onecol(
|
return self._simple_select_onecol(
|
||||||
|
|
|
@ -36,6 +36,7 @@ what sort order was used:
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
|
|
||||||
from ._base import SQLBaseStore
|
from ._base import SQLBaseStore
|
||||||
|
from synapse.api.constants import EventTypes
|
||||||
from synapse.api.errors import SynapseError
|
from synapse.api.errors import SynapseError
|
||||||
from synapse.util.logutils import log_function
|
from synapse.util.logutils import log_function
|
||||||
|
|
||||||
|
@ -127,6 +128,85 @@ class _StreamToken(namedtuple("_StreamToken", "topological stream")):
|
||||||
|
|
||||||
|
|
||||||
class StreamStore(SQLBaseStore):
|
class StreamStore(SQLBaseStore):
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def get_appservice_room_stream(self, service, from_key, to_key, limit=0):
|
||||||
|
# NB this lives here instead of appservice.py so we can reuse the
|
||||||
|
# 'private' StreamToken class in this file.
|
||||||
|
if limit:
|
||||||
|
limit = max(limit, MAX_STREAM_SIZE)
|
||||||
|
else:
|
||||||
|
limit = MAX_STREAM_SIZE
|
||||||
|
|
||||||
|
# From and to keys should be integers from ordering.
|
||||||
|
from_id = _StreamToken.parse_stream_token(from_key)
|
||||||
|
to_id = _StreamToken.parse_stream_token(to_key)
|
||||||
|
|
||||||
|
if from_key == to_key:
|
||||||
|
defer.returnValue(([], to_key))
|
||||||
|
return
|
||||||
|
|
||||||
|
# select all the events between from/to with a sensible limit
|
||||||
|
sql = (
|
||||||
|
"SELECT e.event_id, e.room_id, e.type, s.state_key, "
|
||||||
|
"e.stream_ordering FROM events AS e LEFT JOIN state_events as s ON "
|
||||||
|
"e.event_id = s.event_id "
|
||||||
|
"WHERE e.stream_ordering > ? AND e.stream_ordering <= ? "
|
||||||
|
"ORDER BY stream_ordering ASC LIMIT %(limit)d "
|
||||||
|
) % {
|
||||||
|
"limit": limit
|
||||||
|
}
|
||||||
|
|
||||||
|
def f(txn):
|
||||||
|
# pull out all the events between the tokens
|
||||||
|
txn.execute(sql, (from_id.stream, to_id.stream,))
|
||||||
|
rows = self.cursor_to_dict(txn)
|
||||||
|
|
||||||
|
# Logic:
|
||||||
|
# - We want ALL events which match the AS room_id regex
|
||||||
|
# - We want ALL events which match the rooms represented by the AS
|
||||||
|
# room_alias regex
|
||||||
|
# - We want ALL events for rooms that AS users have joined.
|
||||||
|
# This is currently supported via get_app_service_rooms (which is
|
||||||
|
# used for the Notifier listener rooms). We can't reasonably make a
|
||||||
|
# SQL query for these room IDs, so we'll pull all the events between
|
||||||
|
# from/to and filter in python.
|
||||||
|
rooms_for_as = self._get_app_service_rooms_txn(txn, service)
|
||||||
|
room_ids_for_as = [r.room_id for r in rooms_for_as]
|
||||||
|
|
||||||
|
def app_service_interested(row):
|
||||||
|
if row["room_id"] in room_ids_for_as:
|
||||||
|
return True
|
||||||
|
|
||||||
|
if row["type"] == EventTypes.Member:
|
||||||
|
if service.is_interested_in_user(row.get("state_key")):
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
ret = self._get_events_txn(
|
||||||
|
txn,
|
||||||
|
# apply the filter on the room id list
|
||||||
|
[
|
||||||
|
r["event_id"] for r in rows
|
||||||
|
if app_service_interested(r)
|
||||||
|
],
|
||||||
|
get_prev_content=True
|
||||||
|
)
|
||||||
|
|
||||||
|
self._set_before_and_after(ret, rows)
|
||||||
|
|
||||||
|
if rows:
|
||||||
|
key = "s%d" % max(r["stream_ordering"] for r in rows)
|
||||||
|
else:
|
||||||
|
# Assume we didn't get anything because there was nothing to
|
||||||
|
# get.
|
||||||
|
key = to_key
|
||||||
|
|
||||||
|
return ret, key
|
||||||
|
|
||||||
|
results = yield self.runInteraction("get_appservice_room_stream", f)
|
||||||
|
defer.returnValue(results)
|
||||||
|
|
||||||
@log_function
|
@log_function
|
||||||
def get_room_events_stream(self, user_id, from_key, to_key, room_id,
|
def get_room_events_stream(self, user_id, from_key, to_key, room_id,
|
||||||
limit=0, with_feedback=False):
|
limit=0, with_feedback=False):
|
||||||
|
@ -184,8 +264,7 @@ class StreamStore(SQLBaseStore):
|
||||||
self._set_before_and_after(ret, rows)
|
self._set_before_and_after(ret, rows)
|
||||||
|
|
||||||
if rows:
|
if rows:
|
||||||
key = "s%d" % max([r["stream_ordering"] for r in rows])
|
key = "s%d" % max(r["stream_ordering"] for r in rows)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# Assume we didn't get anything because there was nothing to
|
# Assume we didn't get anything because there was nothing to
|
||||||
# get.
|
# get.
|
||||||
|
|
|
@ -295,6 +295,9 @@ class PresenceEventStreamTestCase(unittest.TestCase):
|
||||||
|
|
||||||
self.mock_datastore = hs.get_datastore()
|
self.mock_datastore = hs.get_datastore()
|
||||||
self.mock_datastore.get_app_service_by_token = Mock(return_value=None)
|
self.mock_datastore.get_app_service_by_token = Mock(return_value=None)
|
||||||
|
self.mock_datastore.get_app_service_by_user_id = Mock(
|
||||||
|
return_value=defer.succeed(None)
|
||||||
|
)
|
||||||
|
|
||||||
def get_profile_displayname(user_id):
|
def get_profile_displayname(user_id):
|
||||||
return defer.succeed("Frank")
|
return defer.succeed("Frank")
|
||||||
|
|
Loading…
Reference in a new issue