mirror of
https://github.com/element-hq/synapse.git
synced 2024-11-27 03:58:06 +03:00
Merge remote-tracking branch 'origin/develop' into store_event_actions
This commit is contained in:
commit
442fcc02f7
6 changed files with 66 additions and 148 deletions
|
@ -120,22 +120,6 @@ class AuthError(SynapseError):
|
|||
super(AuthError, self).__init__(*args, **kwargs)
|
||||
|
||||
|
||||
class GuestAccessError(AuthError):
|
||||
"""An error raised when a there is a problem with a guest user accessing
|
||||
a room"""
|
||||
|
||||
def __init__(self, rooms, *args, **kwargs):
|
||||
self.rooms = rooms
|
||||
super(GuestAccessError, self).__init__(*args, **kwargs)
|
||||
|
||||
def error_dict(self):
|
||||
return cs_error(
|
||||
self.msg,
|
||||
self.errcode,
|
||||
rooms=self.rooms,
|
||||
)
|
||||
|
||||
|
||||
class EventSizeError(SynapseError):
|
||||
"""An error raised when an event is too big."""
|
||||
|
||||
|
|
|
@ -149,9 +149,6 @@ class FilterCollection(object):
|
|||
"include_leave", False
|
||||
)
|
||||
|
||||
def list_rooms(self):
|
||||
return self.room_filter.list_rooms()
|
||||
|
||||
def timeline_limit(self):
|
||||
return self.room_timeline_filter.limit()
|
||||
|
||||
|
@ -184,15 +181,6 @@ class Filter(object):
|
|||
def __init__(self, filter_json):
|
||||
self.filter_json = filter_json
|
||||
|
||||
def list_rooms(self):
|
||||
"""The list of room_id strings this filter restricts the output to
|
||||
or None if the this filter doesn't list the room ids.
|
||||
"""
|
||||
if "rooms" in self.filter_json:
|
||||
return list(set(self.filter_json["rooms"]))
|
||||
else:
|
||||
return None
|
||||
|
||||
def check(self, event):
|
||||
"""Checks whether the filter matches the given event.
|
||||
|
||||
|
|
|
@ -688,6 +688,7 @@ def run(hs):
|
|||
|
||||
@defer.inlineCallbacks
|
||||
def phone_stats_home():
|
||||
logger.info("Gathering stats for reporting")
|
||||
now = int(hs.get_clock().time())
|
||||
uptime = int(now - start_time)
|
||||
if uptime < 0:
|
||||
|
@ -718,6 +719,7 @@ def run(hs):
|
|||
|
||||
if hs.config.report_stats:
|
||||
phone_home_task = task.LoopingCall(phone_stats_home)
|
||||
logger.info("Scheduling stats reporting for 24 hour intervals")
|
||||
phone_home_task.start(60 * 60 * 24, now=False)
|
||||
|
||||
def in_thread():
|
||||
|
|
|
@ -15,8 +15,8 @@
|
|||
|
||||
from ._base import BaseHandler
|
||||
|
||||
from synapse.streams.config import PaginationConfig
|
||||
from synapse.api.constants import Membership, EventTypes
|
||||
from synapse.api.errors import GuestAccessError
|
||||
from synapse.util import unwrapFirstError
|
||||
|
||||
from twisted.internet import defer
|
||||
|
@ -29,8 +29,8 @@ logger = logging.getLogger(__name__)
|
|||
|
||||
SyncConfig = collections.namedtuple("SyncConfig", [
|
||||
"user",
|
||||
"is_guest",
|
||||
"filter",
|
||||
"is_guest",
|
||||
])
|
||||
|
||||
|
||||
|
@ -121,8 +121,6 @@ class SyncResult(collections.namedtuple("SyncResult", [
|
|||
self.presence or self.joined or self.invited or self.archived
|
||||
)
|
||||
|
||||
GuestRoom = collections.namedtuple("GuestRoom", ("room_id", "membership"))
|
||||
|
||||
|
||||
class SyncHandler(BaseHandler):
|
||||
|
||||
|
@ -141,18 +139,6 @@ class SyncHandler(BaseHandler):
|
|||
A Deferred SyncResult.
|
||||
"""
|
||||
|
||||
if sync_config.is_guest:
|
||||
bad_rooms = []
|
||||
for room_id in sync_config.filter.list_rooms():
|
||||
world_readable = yield self._is_world_readable(room_id)
|
||||
if not world_readable:
|
||||
bad_rooms.append(room_id)
|
||||
|
||||
if bad_rooms:
|
||||
raise GuestAccessError(
|
||||
bad_rooms, 403, "Guest access not allowed"
|
||||
)
|
||||
|
||||
if timeout == 0 or since_token is None or full_state:
|
||||
# we are going to return immediately, so don't bother calling
|
||||
# notifier.wait_for_events.
|
||||
|
@ -169,17 +155,6 @@ class SyncHandler(BaseHandler):
|
|||
)
|
||||
defer.returnValue(result)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _is_world_readable(self, room_id):
|
||||
state = yield self.hs.get_state_handler().get_current_state(
|
||||
room_id,
|
||||
EventTypes.RoomHistoryVisibility
|
||||
)
|
||||
if state and "history_visibility" in state.content:
|
||||
defer.returnValue(state.content["history_visibility"] == "world_readable")
|
||||
else:
|
||||
defer.returnValue(False)
|
||||
|
||||
def current_sync_for_user(self, sync_config, since_token=None,
|
||||
full_state=False):
|
||||
"""Get the sync for client needed to match what the server has now.
|
||||
|
@ -215,52 +190,37 @@ class SyncHandler(BaseHandler):
|
|||
"""
|
||||
now_token = yield self.event_sources.get_current_token()
|
||||
|
||||
if sync_config.is_guest:
|
||||
room_list = [
|
||||
GuestRoom(room_id, Membership.JOIN)
|
||||
for room_id in sync_config.filter.list_rooms()
|
||||
]
|
||||
|
||||
account_data = {}
|
||||
account_data_by_room = {}
|
||||
tags_by_room = {}
|
||||
|
||||
else:
|
||||
membership_list = (Membership.INVITE, Membership.JOIN)
|
||||
if sync_config.filter.include_leave:
|
||||
membership_list += (Membership.LEAVE, Membership.BAN)
|
||||
|
||||
room_list = yield self.store.get_rooms_for_user_where_membership_is(
|
||||
user_id=sync_config.user.to_string(),
|
||||
membership_list=membership_list
|
||||
)
|
||||
|
||||
account_data, account_data_by_room = (
|
||||
yield self.store.get_account_data_for_user(
|
||||
sync_config.user.to_string()
|
||||
)
|
||||
)
|
||||
|
||||
tags_by_room = yield self.store.get_tags_for_user(
|
||||
sync_config.user.to_string()
|
||||
)
|
||||
|
||||
presence_stream = self.event_sources.sources["presence"]
|
||||
|
||||
joined_room_ids = [
|
||||
room.room_id for room in room_list
|
||||
if room.membership == Membership.JOIN
|
||||
]
|
||||
|
||||
presence, _ = yield presence_stream.get_new_events(
|
||||
from_key=0,
|
||||
user=sync_config.user,
|
||||
room_ids=joined_room_ids,
|
||||
is_guest=sync_config.is_guest,
|
||||
now_token, ephemeral_by_room = yield self.ephemeral_by_room(
|
||||
sync_config, now_token
|
||||
)
|
||||
|
||||
now_token, ephemeral_by_room = yield self.ephemeral_by_room(
|
||||
sync_config, now_token, joined_room_ids
|
||||
presence_stream = self.event_sources.sources["presence"]
|
||||
# TODO (mjark): This looks wrong, shouldn't we be getting the presence
|
||||
# UP to the present rather than after the present?
|
||||
pagination_config = PaginationConfig(from_token=now_token)
|
||||
presence, _ = yield presence_stream.get_pagination_rows(
|
||||
user=sync_config.user,
|
||||
pagination_config=pagination_config.get_source_config("presence"),
|
||||
key=None
|
||||
)
|
||||
|
||||
membership_list = (Membership.INVITE, Membership.JOIN)
|
||||
if sync_config.filter.include_leave:
|
||||
membership_list += (Membership.LEAVE, Membership.BAN)
|
||||
|
||||
room_list = yield self.store.get_rooms_for_user_where_membership_is(
|
||||
user_id=sync_config.user.to_string(),
|
||||
membership_list=membership_list
|
||||
)
|
||||
|
||||
account_data, account_data_by_room = (
|
||||
yield self.store.get_account_data_for_user(
|
||||
sync_config.user.to_string()
|
||||
)
|
||||
)
|
||||
|
||||
tags_by_room = yield self.store.get_tags_for_user(
|
||||
sync_config.user.to_string()
|
||||
)
|
||||
|
||||
joined = []
|
||||
|
@ -379,13 +339,11 @@ class SyncHandler(BaseHandler):
|
|||
return account_data_events
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def ephemeral_by_room(self, sync_config, now_token, room_ids,
|
||||
since_token=None):
|
||||
def ephemeral_by_room(self, sync_config, now_token, since_token=None):
|
||||
"""Get the ephemeral events for each room the user is in
|
||||
Args:
|
||||
sync_config (SyncConfig): The flags, filters and user for the sync.
|
||||
now_token (StreamToken): Where the server is currently up to.
|
||||
room_ids (list): List of room id strings to get data for.
|
||||
since_token (StreamToken): Where the server was when the client
|
||||
last synced.
|
||||
Returns:
|
||||
|
@ -396,13 +354,16 @@ class SyncHandler(BaseHandler):
|
|||
|
||||
typing_key = since_token.typing_key if since_token else "0"
|
||||
|
||||
rooms = yield self.store.get_rooms_for_user(sync_config.user.to_string())
|
||||
room_ids = [room.room_id for room in rooms]
|
||||
|
||||
typing_source = self.event_sources.sources["typing"]
|
||||
typing, typing_key = yield typing_source.get_new_events(
|
||||
user=sync_config.user,
|
||||
from_key=typing_key,
|
||||
limit=sync_config.filter.ephemeral_limit(),
|
||||
room_ids=room_ids,
|
||||
is_guest=False,
|
||||
is_guest=sync_config.is_guest,
|
||||
)
|
||||
now_token = now_token.copy_and_replace("typing_key", typing_key)
|
||||
|
||||
|
@ -425,8 +386,7 @@ class SyncHandler(BaseHandler):
|
|||
from_key=receipt_key,
|
||||
limit=sync_config.filter.ephemeral_limit(),
|
||||
room_ids=room_ids,
|
||||
# /sync doesn't support guest access, they can't get to this point in code
|
||||
is_guest=False,
|
||||
is_guest=sync_config.is_guest,
|
||||
)
|
||||
now_token = now_token.copy_and_replace("receipt_key", receipt_key)
|
||||
|
||||
|
@ -473,38 +433,8 @@ class SyncHandler(BaseHandler):
|
|||
"""
|
||||
now_token = yield self.event_sources.get_current_token()
|
||||
|
||||
if sync_config.is_guest:
|
||||
room_ids = sync_config.filter.list_rooms()
|
||||
|
||||
tags_by_room = {}
|
||||
account_data = {}
|
||||
account_data_by_room = {}
|
||||
|
||||
else:
|
||||
rooms = yield self.store.get_rooms_for_user(
|
||||
sync_config.user.to_string()
|
||||
)
|
||||
room_ids = [room.room_id for room in rooms]
|
||||
|
||||
now_token, ephemeral_by_room = yield self.ephemeral_by_room(
|
||||
sync_config, now_token, since_token
|
||||
)
|
||||
|
||||
tags_by_room = yield self.store.get_updated_tags(
|
||||
sync_config.user.to_string(),
|
||||
since_token.account_data_key,
|
||||
)
|
||||
|
||||
account_data, account_data_by_room = (
|
||||
yield self.store.get_updated_account_data_for_user(
|
||||
sync_config.user.to_string(),
|
||||
since_token.account_data_key,
|
||||
)
|
||||
)
|
||||
|
||||
now_token, ephemeral_by_room = yield self.ephemeral_by_room(
|
||||
sync_config, now_token, room_ids, since_token
|
||||
)
|
||||
rooms = yield self.store.get_rooms_for_user(sync_config.user.to_string())
|
||||
room_ids = [room.room_id for room in rooms]
|
||||
|
||||
presence_source = self.event_sources.sources["presence"]
|
||||
presence, presence_key = yield presence_source.get_new_events(
|
||||
|
@ -520,11 +450,11 @@ class SyncHandler(BaseHandler):
|
|||
# this users current read receipt. This could almost certainly be
|
||||
# optimised.
|
||||
_, all_ephemeral_by_room = yield self.ephemeral_by_room(
|
||||
sync_config, now_token, room_ids
|
||||
sync_config, now_token
|
||||
)
|
||||
|
||||
now_token, ephemeral_by_room = yield self.ephemeral_by_room(
|
||||
sync_config, now_token, room_ids, since_token
|
||||
sync_config, now_token, since_token
|
||||
)
|
||||
|
||||
rm_handler = self.hs.get_handlers().room_member_handler
|
||||
|
@ -546,8 +476,18 @@ class SyncHandler(BaseHandler):
|
|||
from_key=since_token.room_key,
|
||||
to_key=now_token.room_key,
|
||||
limit=timeline_limit + 1,
|
||||
room_ids=room_ids if sync_config.is_guest else (),
|
||||
is_guest=sync_config.is_guest,
|
||||
)
|
||||
|
||||
tags_by_room = yield self.store.get_updated_tags(
|
||||
sync_config.user.to_string(),
|
||||
since_token.account_data_key,
|
||||
)
|
||||
|
||||
account_data, account_data_by_room = (
|
||||
yield self.store.get_updated_account_data_for_user(
|
||||
sync_config.user.to_string(),
|
||||
since_token.account_data_key,
|
||||
)
|
||||
)
|
||||
|
||||
joined = []
|
||||
|
|
|
@ -120,15 +120,10 @@ class SyncRestServlet(RestServlet):
|
|||
except:
|
||||
filter = FilterCollection({})
|
||||
|
||||
if is_guest and filter.list_rooms() is None:
|
||||
raise SynapseError(
|
||||
400, "Guest users must provide a list of rooms in the filter"
|
||||
)
|
||||
|
||||
sync_config = SyncConfig(
|
||||
user=user,
|
||||
is_guest=is_guest,
|
||||
filter=filter,
|
||||
is_guest=is_guest,
|
||||
)
|
||||
|
||||
if since is not None:
|
||||
|
|
|
@ -936,6 +936,7 @@ class EventsStore(SQLBaseStore):
|
|||
)
|
||||
now_reporting = self.cursor_to_dict(txn)
|
||||
if not now_reporting:
|
||||
logger.info("Calculating daily messages skipped; no now_reporting")
|
||||
return None
|
||||
now_reporting = now_reporting[0]["stream_ordering"]
|
||||
|
||||
|
@ -948,11 +949,18 @@ class EventsStore(SQLBaseStore):
|
|||
)
|
||||
|
||||
if not last_reported:
|
||||
logger.info("Calculating daily messages skipped; no last_reported")
|
||||
return None
|
||||
|
||||
# Close enough to correct for our purposes.
|
||||
yesterday = (now - 24 * 60 * 60)
|
||||
if math.fabs(yesterday - last_reported[0]["reported_time"]) > 60 * 60:
|
||||
since_yesterday_seconds = yesterday - last_reported[0]["reported_time"]
|
||||
any_since_yesterday = math.fabs(since_yesterday_seconds) > 60 * 60
|
||||
if any_since_yesterday:
|
||||
logger.info(
|
||||
"Calculating daily messages skipped; since_yesterday_seconds: %d" %
|
||||
(since_yesterday_seconds,)
|
||||
)
|
||||
return None
|
||||
|
||||
txn.execute(
|
||||
|
@ -968,6 +976,7 @@ class EventsStore(SQLBaseStore):
|
|||
)
|
||||
rows = self.cursor_to_dict(txn)
|
||||
if not rows:
|
||||
logger.info("Calculating daily messages skipped; messages count missing")
|
||||
return None
|
||||
return rows[0]["messages"]
|
||||
|
||||
|
|
Loading…
Reference in a new issue