mirror of
https://github.com/element-hq/synapse.git
synced 2024-11-29 07:28:55 +03:00
Merge branch 'develop' into application-services
This commit is contained in:
commit
9978c5c103
26 changed files with 240 additions and 115 deletions
1
VERSION
1
VERSION
|
@ -1 +0,0 @@
|
||||||
0.6.1d
|
|
61
setup.py
61
setup.py
|
@ -18,51 +18,42 @@ import os
|
||||||
from setuptools import setup, find_packages
|
from setuptools import setup, find_packages
|
||||||
|
|
||||||
|
|
||||||
# Utility function to read the README file.
|
here = os.path.abspath(os.path.dirname(__file__))
|
||||||
# Used for the long_description. It's nice, because now 1) we have a top level
|
|
||||||
# README file and 2) it's easier to type in the README file than to put a raw
|
|
||||||
# string in below ...
|
def read_file(path_segments):
|
||||||
def read(fname):
|
"""Read a file from the package. Takes a list of strings to join to
|
||||||
return open(os.path.join(os.path.dirname(__file__), fname)).read()
|
make the path"""
|
||||||
|
file_path = os.path.join(here, *path_segments)
|
||||||
|
with open(file_path) as f:
|
||||||
|
return f.read()
|
||||||
|
|
||||||
|
|
||||||
|
def exec_file(path_segments):
|
||||||
|
"""Execute a single python file to get the variables defined in it"""
|
||||||
|
result = {}
|
||||||
|
code = read_file(path_segments)
|
||||||
|
exec(code, result)
|
||||||
|
return result
|
||||||
|
|
||||||
|
version = exec_file(("synapse", "__init__.py"))["__version__"]
|
||||||
|
dependencies = exec_file(("synapse", "python_dependencies.py"))
|
||||||
|
long_description = read_file(("README.rst",))
|
||||||
|
|
||||||
setup(
|
setup(
|
||||||
name="matrix-synapse",
|
name="matrix-synapse",
|
||||||
version=read("VERSION").strip(),
|
version=version,
|
||||||
packages=find_packages(exclude=["tests", "tests.*"]),
|
packages=find_packages(exclude=["tests", "tests.*"]),
|
||||||
description="Reference Synapse Home Server",
|
description="Reference Synapse Home Server",
|
||||||
install_requires=[
|
install_requires=dependencies["REQUIREMENTS"].keys(),
|
||||||
"syutil==0.0.2",
|
|
||||||
"matrix_angular_sdk>=0.6.1",
|
|
||||||
"Twisted==14.0.2",
|
|
||||||
"service_identity>=1.0.0",
|
|
||||||
"pyopenssl>=0.14",
|
|
||||||
"pyyaml",
|
|
||||||
"pyasn1",
|
|
||||||
"pynacl",
|
|
||||||
"daemonize",
|
|
||||||
"py-bcrypt",
|
|
||||||
"frozendict>=0.4",
|
|
||||||
"pillow",
|
|
||||||
"pydenticon",
|
|
||||||
],
|
|
||||||
dependency_links=[
|
|
||||||
"https://github.com/matrix-org/syutil/tarball/v0.0.2#egg=syutil-0.0.2",
|
|
||||||
"https://github.com/pyca/pynacl/tarball/d4d3175589b892f6ea7c22f466e0e223853516fa#egg=pynacl-0.3.0",
|
|
||||||
"https://github.com/matrix-org/matrix-angular-sdk/tarball/v0.6.1/#egg=matrix_angular_sdk-0.6.1",
|
|
||||||
],
|
|
||||||
setup_requires=[
|
setup_requires=[
|
||||||
"Twisted==14.0.2", # Here to override setuptools_trial's dependency on Twisted>=2.4.0
|
"Twisted==14.0.2", # Here to override setuptools_trial's dependency on Twisted>=2.4.0
|
||||||
"setuptools_trial",
|
"setuptools_trial",
|
||||||
"setuptools>=1.0.0", # Needs setuptools that supports git+ssh.
|
|
||||||
# TODO: Do we need this now? we don't use git+ssh.
|
|
||||||
"mock"
|
"mock"
|
||||||
],
|
],
|
||||||
|
dependency_links=dependencies["DEPENDENCY_LINKS"],
|
||||||
include_package_data=True,
|
include_package_data=True,
|
||||||
zip_safe=False,
|
zip_safe=False,
|
||||||
long_description=read("README.rst"),
|
long_description=long_description,
|
||||||
entry_points="""
|
scripts=["synctl"],
|
||||||
[console_scripts]
|
|
||||||
synctl=synapse.app.synctl:main
|
|
||||||
synapse-homeserver=synapse.app.homeserver:main
|
|
||||||
"""
|
|
||||||
)
|
)
|
||||||
|
|
|
@ -89,12 +89,19 @@ class Auth(object):
|
||||||
raise
|
raise
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def check_joined_room(self, room_id, user_id):
|
def check_joined_room(self, room_id, user_id, current_state=None):
|
||||||
|
if current_state:
|
||||||
|
member = current_state.get(
|
||||||
|
(EventTypes.Member, user_id),
|
||||||
|
None
|
||||||
|
)
|
||||||
|
else:
|
||||||
member = yield self.state.get_current_state(
|
member = yield self.state.get_current_state(
|
||||||
room_id=room_id,
|
room_id=room_id,
|
||||||
event_type=EventTypes.Member,
|
event_type=EventTypes.Member,
|
||||||
state_key=user_id
|
state_key=user_id
|
||||||
)
|
)
|
||||||
|
|
||||||
self._check_joined_room(member, user_id, room_id)
|
self._check_joined_room(member, user_id, room_id)
|
||||||
defer.returnValue(member)
|
defer.returnValue(member)
|
||||||
|
|
||||||
|
@ -102,7 +109,7 @@ class Auth(object):
|
||||||
def check_host_in_room(self, room_id, host):
|
def check_host_in_room(self, room_id, host):
|
||||||
curr_state = yield self.state.get_current_state(room_id)
|
curr_state = yield self.state.get_current_state(room_id)
|
||||||
|
|
||||||
for event in curr_state:
|
for event in curr_state.values():
|
||||||
if event.type == EventTypes.Member:
|
if event.type == EventTypes.Member:
|
||||||
try:
|
try:
|
||||||
if UserID.from_string(event.state_key).domain != host:
|
if UserID.from_string(event.state_key).domain != host:
|
||||||
|
|
|
@ -139,7 +139,7 @@ class SynapseHomeServer(HomeServer):
|
||||||
logger.info("Attaching %s to path %s", resource, full_path)
|
logger.info("Attaching %s to path %s", resource, full_path)
|
||||||
last_resource = self.root_resource
|
last_resource = self.root_resource
|
||||||
for path_seg in full_path.split('/')[1:-1]:
|
for path_seg in full_path.split('/')[1:-1]:
|
||||||
if not path_seg in last_resource.listNames():
|
if path_seg not in last_resource.listNames():
|
||||||
# resource doesn't exist, so make a "dummy resource"
|
# resource doesn't exist, so make a "dummy resource"
|
||||||
child_resource = Resource()
|
child_resource = Resource()
|
||||||
last_resource.putChild(path_seg, child_resource)
|
last_resource.putChild(path_seg, child_resource)
|
||||||
|
|
|
@ -50,8 +50,9 @@ class Config(object):
|
||||||
)
|
)
|
||||||
return cls.abspath(file_path)
|
return cls.abspath(file_path)
|
||||||
|
|
||||||
@staticmethod
|
@classmethod
|
||||||
def ensure_directory(dir_path):
|
def ensure_directory(cls, dir_path):
|
||||||
|
dir_path = cls.abspath(dir_path)
|
||||||
if not os.path.exists(dir_path):
|
if not os.path.exists(dir_path):
|
||||||
os.makedirs(dir_path)
|
os.makedirs(dir_path)
|
||||||
if not os.path.isdir(dir_path):
|
if not os.path.isdir(dir_path):
|
||||||
|
|
|
@ -18,6 +18,7 @@ from synapse.util.logcontext import LoggingContextFilter
|
||||||
from twisted.python.log import PythonLoggingObserver
|
from twisted.python.log import PythonLoggingObserver
|
||||||
import logging
|
import logging
|
||||||
import logging.config
|
import logging.config
|
||||||
|
import yaml
|
||||||
|
|
||||||
|
|
||||||
class LoggingConfig(Config):
|
class LoggingConfig(Config):
|
||||||
|
@ -79,7 +80,8 @@ class LoggingConfig(Config):
|
||||||
logger.addHandler(handler)
|
logger.addHandler(handler)
|
||||||
logger.info("Test")
|
logger.info("Test")
|
||||||
else:
|
else:
|
||||||
logging.config.fileConfig(self.log_config)
|
with open(self.log_config, 'r') as f:
|
||||||
|
logging.config.dictConfig(yaml.load(f))
|
||||||
|
|
||||||
observer = PythonLoggingObserver()
|
observer = PythonLoggingObserver()
|
||||||
observer.start()
|
observer.start()
|
||||||
|
|
|
@ -75,7 +75,7 @@ class SynapseKeyClientProtocol(HTTPClient):
|
||||||
|
|
||||||
def handleStatus(self, version, status, message):
|
def handleStatus(self, version, status, message):
|
||||||
if status != b"200":
|
if status != b"200":
|
||||||
#logger.info("Non-200 response from %s: %s %s",
|
# logger.info("Non-200 response from %s: %s %s",
|
||||||
# self.transport.getHost(), status, message)
|
# self.transport.getHost(), status, message)
|
||||||
self.transport.abortConnection()
|
self.transport.abortConnection()
|
||||||
|
|
||||||
|
@ -83,7 +83,7 @@ class SynapseKeyClientProtocol(HTTPClient):
|
||||||
try:
|
try:
|
||||||
json_response = json.loads(response_body_bytes)
|
json_response = json.loads(response_body_bytes)
|
||||||
except ValueError:
|
except ValueError:
|
||||||
#logger.info("Invalid JSON response from %s",
|
# logger.info("Invalid JSON response from %s",
|
||||||
# self.transport.getHost())
|
# self.transport.getHost())
|
||||||
self.transport.abortConnection()
|
self.transport.abortConnection()
|
||||||
return
|
return
|
||||||
|
|
|
@ -25,6 +25,8 @@ from synapse.events import FrozenEvent
|
||||||
|
|
||||||
from synapse.api.errors import FederationError, SynapseError
|
from synapse.api.errors import FederationError, SynapseError
|
||||||
|
|
||||||
|
from synapse.crypto.event_signing import compute_event_signature
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
|
|
||||||
|
@ -156,6 +158,15 @@ class FederationServer(FederationBase):
|
||||||
auth_chain = yield self.store.get_auth_chain(
|
auth_chain = yield self.store.get_auth_chain(
|
||||||
[pdu.event_id for pdu in pdus]
|
[pdu.event_id for pdu in pdus]
|
||||||
)
|
)
|
||||||
|
|
||||||
|
for event in auth_chain:
|
||||||
|
event.signatures.update(
|
||||||
|
compute_event_signature(
|
||||||
|
event,
|
||||||
|
self.hs.hostname,
|
||||||
|
self.hs.config.signing_key[0]
|
||||||
|
)
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
raise NotImplementedError("Specify an event")
|
raise NotImplementedError("Specify an event")
|
||||||
|
|
||||||
|
|
|
@ -157,15 +157,19 @@ class TransactionQueue(object):
|
||||||
else:
|
else:
|
||||||
logger.info("TX [%s] is ready for retry", destination)
|
logger.info("TX [%s] is ready for retry", destination)
|
||||||
|
|
||||||
logger.info("TX [%s] _attempt_new_transaction", destination)
|
|
||||||
|
|
||||||
if destination in self.pending_transactions:
|
if destination in self.pending_transactions:
|
||||||
# XXX: pending_transactions can get stuck on by a never-ending
|
# XXX: pending_transactions can get stuck on by a never-ending
|
||||||
# request at which point pending_pdus_by_dest just keeps growing.
|
# request at which point pending_pdus_by_dest just keeps growing.
|
||||||
# we need application-layer timeouts of some flavour of these
|
# we need application-layer timeouts of some flavour of these
|
||||||
# requests
|
# requests
|
||||||
|
logger.info(
|
||||||
|
"TX [%s] Transaction already in progress",
|
||||||
|
destination
|
||||||
|
)
|
||||||
return
|
return
|
||||||
|
|
||||||
|
logger.info("TX [%s] _attempt_new_transaction", destination)
|
||||||
|
|
||||||
# list of (pending_pdu, deferred, order)
|
# list of (pending_pdu, deferred, order)
|
||||||
pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
|
pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
|
||||||
pending_edus = self.pending_edus_by_dest.pop(destination, [])
|
pending_edus = self.pending_edus_by_dest.pop(destination, [])
|
||||||
|
@ -176,6 +180,7 @@ class TransactionQueue(object):
|
||||||
destination, len(pending_pdus))
|
destination, len(pending_pdus))
|
||||||
|
|
||||||
if not pending_pdus and not pending_edus and not pending_failures:
|
if not pending_pdus and not pending_edus and not pending_failures:
|
||||||
|
logger.info("TX [%s] Nothing to send", destination)
|
||||||
return
|
return
|
||||||
|
|
||||||
logger.debug(
|
logger.debug(
|
||||||
|
|
|
@ -860,9 +860,14 @@ class FederationHandler(BaseHandler):
|
||||||
# Only do auth resolution if we have something new to say.
|
# Only do auth resolution if we have something new to say.
|
||||||
# We can't rove an auth failure.
|
# We can't rove an auth failure.
|
||||||
do_resolution = False
|
do_resolution = False
|
||||||
|
|
||||||
|
provable = [
|
||||||
|
RejectedReason.NOT_ANCESTOR, RejectedReason.NOT_ANCESTOR,
|
||||||
|
]
|
||||||
|
|
||||||
for e_id in different_auth:
|
for e_id in different_auth:
|
||||||
if e_id in have_events:
|
if e_id in have_events:
|
||||||
if have_events[e_id] != RejectedReason.AUTH_ERROR:
|
if have_events[e_id] in provable:
|
||||||
do_resolution = True
|
do_resolution = True
|
||||||
break
|
break
|
||||||
|
|
||||||
|
|
|
@ -35,6 +35,7 @@ class MessageHandler(BaseHandler):
|
||||||
def __init__(self, hs):
|
def __init__(self, hs):
|
||||||
super(MessageHandler, self).__init__(hs)
|
super(MessageHandler, self).__init__(hs)
|
||||||
self.hs = hs
|
self.hs = hs
|
||||||
|
self.state = hs.get_state_handler()
|
||||||
self.clock = hs.get_clock()
|
self.clock = hs.get_clock()
|
||||||
self.validator = EventValidator()
|
self.validator = EventValidator()
|
||||||
|
|
||||||
|
@ -225,7 +226,9 @@ class MessageHandler(BaseHandler):
|
||||||
# TODO: This is duplicating logic from snapshot_all_rooms
|
# TODO: This is duplicating logic from snapshot_all_rooms
|
||||||
current_state = yield self.state_handler.get_current_state(room_id)
|
current_state = yield self.state_handler.get_current_state(room_id)
|
||||||
now = self.clock.time_msec()
|
now = self.clock.time_msec()
|
||||||
defer.returnValue([serialize_event(c, now) for c in current_state])
|
defer.returnValue(
|
||||||
|
[serialize_event(c, now) for c in current_state.values()]
|
||||||
|
)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def snapshot_all_rooms(self, user_id=None, pagin_config=None,
|
def snapshot_all_rooms(self, user_id=None, pagin_config=None,
|
||||||
|
@ -313,7 +316,7 @@ class MessageHandler(BaseHandler):
|
||||||
)
|
)
|
||||||
d["state"] = [
|
d["state"] = [
|
||||||
serialize_event(c, time_now, as_client_event)
|
serialize_event(c, time_now, as_client_event)
|
||||||
for c in current_state
|
for c in current_state.values()
|
||||||
]
|
]
|
||||||
except:
|
except:
|
||||||
logger.exception("Failed to get snapshot")
|
logger.exception("Failed to get snapshot")
|
||||||
|
@ -329,7 +332,14 @@ class MessageHandler(BaseHandler):
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def room_initial_sync(self, user_id, room_id, pagin_config=None,
|
def room_initial_sync(self, user_id, room_id, pagin_config=None,
|
||||||
feedback=False):
|
feedback=False):
|
||||||
yield self.auth.check_joined_room(room_id, user_id)
|
current_state = yield self.state.get_current_state(
|
||||||
|
room_id=room_id,
|
||||||
|
)
|
||||||
|
|
||||||
|
yield self.auth.check_joined_room(
|
||||||
|
room_id, user_id,
|
||||||
|
current_state=current_state
|
||||||
|
)
|
||||||
|
|
||||||
# TODO(paul): I wish I was called with user objects not user_id
|
# TODO(paul): I wish I was called with user objects not user_id
|
||||||
# strings...
|
# strings...
|
||||||
|
@ -337,13 +347,12 @@ class MessageHandler(BaseHandler):
|
||||||
|
|
||||||
# TODO: These concurrently
|
# TODO: These concurrently
|
||||||
time_now = self.clock.time_msec()
|
time_now = self.clock.time_msec()
|
||||||
state_tuples = yield self.state_handler.get_current_state(room_id)
|
state = [
|
||||||
state = [serialize_event(x, time_now) for x in state_tuples]
|
serialize_event(x, time_now)
|
||||||
|
for x in current_state.values()
|
||||||
|
]
|
||||||
|
|
||||||
member_event = (yield self.store.get_room_member(
|
member_event = current_state.get((EventTypes.Member, user_id,))
|
||||||
user_id=user_id,
|
|
||||||
room_id=room_id
|
|
||||||
))
|
|
||||||
|
|
||||||
now_token = yield self.hs.get_event_sources().get_current_token()
|
now_token = yield self.hs.get_event_sources().get_current_token()
|
||||||
|
|
||||||
|
@ -360,7 +369,10 @@ class MessageHandler(BaseHandler):
|
||||||
start_token = now_token.copy_and_replace("room_key", token[0])
|
start_token = now_token.copy_and_replace("room_key", token[0])
|
||||||
end_token = now_token.copy_and_replace("room_key", token[1])
|
end_token = now_token.copy_and_replace("room_key", token[1])
|
||||||
|
|
||||||
room_members = yield self.store.get_room_members(room_id)
|
room_members = [
|
||||||
|
m for m in current_state.values()
|
||||||
|
if m.type == EventTypes.Member
|
||||||
|
]
|
||||||
|
|
||||||
presence_handler = self.hs.get_handlers().presence_handler
|
presence_handler = self.hs.get_handlers().presence_handler
|
||||||
presence = []
|
presence = []
|
||||||
|
|
|
@ -457,9 +457,9 @@ class PresenceHandler(BaseHandler):
|
||||||
if state is None:
|
if state is None:
|
||||||
state = yield self.store.get_presence_state(user.localpart)
|
state = yield self.store.get_presence_state(user.localpart)
|
||||||
else:
|
else:
|
||||||
# statuscache = self._get_or_make_usercache(user)
|
# statuscache = self._get_or_make_usercache(user)
|
||||||
# self._user_cachemap_latest_serial += 1
|
# self._user_cachemap_latest_serial += 1
|
||||||
# statuscache.update(state, self._user_cachemap_latest_serial)
|
# statuscache.update(state, self._user_cachemap_latest_serial)
|
||||||
pass
|
pass
|
||||||
|
|
||||||
yield self.push_update_to_local_and_remote(
|
yield self.push_update_to_local_and_remote(
|
||||||
|
@ -658,7 +658,9 @@ class PresenceHandler(BaseHandler):
|
||||||
|
|
||||||
observers = set(self._remote_recvmap.get(user, set()))
|
observers = set(self._remote_recvmap.get(user, set()))
|
||||||
if observers:
|
if observers:
|
||||||
logger.debug(" | %d interested local observers %r", len(observers), observers)
|
logger.debug(
|
||||||
|
" | %d interested local observers %r", len(observers), observers
|
||||||
|
)
|
||||||
|
|
||||||
rm_handler = self.homeserver.get_handlers().room_member_handler
|
rm_handler = self.homeserver.get_handlers().room_member_handler
|
||||||
room_ids = yield rm_handler.get_rooms_for_user(user)
|
room_ids = yield rm_handler.get_rooms_for_user(user)
|
||||||
|
@ -707,7 +709,7 @@ class PresenceHandler(BaseHandler):
|
||||||
|
|
||||||
# TODO(paul) permissions checks
|
# TODO(paul) permissions checks
|
||||||
|
|
||||||
if not user in self._remote_sendmap:
|
if user not in self._remote_sendmap:
|
||||||
self._remote_sendmap[user] = set()
|
self._remote_sendmap[user] = set()
|
||||||
|
|
||||||
self._remote_sendmap[user].add(origin)
|
self._remote_sendmap[user].add(origin)
|
||||||
|
|
|
@ -110,14 +110,17 @@ class RegistrationHandler(BaseHandler):
|
||||||
# do it here.
|
# do it here.
|
||||||
try:
|
try:
|
||||||
auth_user = UserID.from_string(user_id)
|
auth_user = UserID.from_string(user_id)
|
||||||
identicon_resource = self.hs.get_resource_for_media_repository().getChildWithDefault("identicon", None)
|
media_repository = self.hs.get_resource_for_media_repository()
|
||||||
upload_resource = self.hs.get_resource_for_media_repository().getChildWithDefault("upload", None)
|
identicon_resource = media_repository.getChildWithDefault("identicon", None)
|
||||||
|
upload_resource = media_repository.getChildWithDefault("upload", None)
|
||||||
identicon_bytes = identicon_resource.generate_identicon(user_id, 320, 320)
|
identicon_bytes = identicon_resource.generate_identicon(user_id, 320, 320)
|
||||||
content_uri = yield upload_resource.create_content(
|
content_uri = yield upload_resource.create_content(
|
||||||
"image/png", None, identicon_bytes, len(identicon_bytes), auth_user
|
"image/png", None, identicon_bytes, len(identicon_bytes), auth_user
|
||||||
)
|
)
|
||||||
profile_handler = self.hs.get_handlers().profile_handler
|
profile_handler = self.hs.get_handlers().profile_handler
|
||||||
profile_handler.set_avatar_url(auth_user, auth_user, ("%s#auto" % content_uri))
|
profile_handler.set_avatar_url(
|
||||||
|
auth_user, auth_user, ("%s#auto" % (content_uri,))
|
||||||
|
)
|
||||||
except NotImplementedError:
|
except NotImplementedError:
|
||||||
pass # make tests pass without messing around creating default avatars
|
pass # make tests pass without messing around creating default avatars
|
||||||
|
|
||||||
|
|
|
@ -114,7 +114,7 @@ class SyncHandler(BaseHandler):
|
||||||
if sync_config.gap:
|
if sync_config.gap:
|
||||||
return self.incremental_sync_with_gap(sync_config, since_token)
|
return self.incremental_sync_with_gap(sync_config, since_token)
|
||||||
else:
|
else:
|
||||||
#TODO(mjark): Handle gapless sync
|
# TODO(mjark): Handle gapless sync
|
||||||
raise NotImplementedError()
|
raise NotImplementedError()
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
|
@ -175,9 +175,10 @@ class SyncHandler(BaseHandler):
|
||||||
room_id, sync_config, now_token,
|
room_id, sync_config, now_token,
|
||||||
)
|
)
|
||||||
|
|
||||||
current_state_events = yield self.state_handler.get_current_state(
|
current_state = yield self.state_handler.get_current_state(
|
||||||
room_id
|
room_id
|
||||||
)
|
)
|
||||||
|
current_state_events = current_state.values()
|
||||||
|
|
||||||
defer.returnValue(RoomSyncResult(
|
defer.returnValue(RoomSyncResult(
|
||||||
room_id=room_id,
|
room_id=room_id,
|
||||||
|
@ -347,9 +348,10 @@ class SyncHandler(BaseHandler):
|
||||||
|
|
||||||
# TODO(mjark): This seems racy since this isn't being passed a
|
# TODO(mjark): This seems racy since this isn't being passed a
|
||||||
# token to indicate what point in the stream this is
|
# token to indicate what point in the stream this is
|
||||||
current_state_events = yield self.state_handler.get_current_state(
|
current_state = yield self.state_handler.get_current_state(
|
||||||
room_id
|
room_id
|
||||||
)
|
)
|
||||||
|
current_state_events = current_state.values()
|
||||||
|
|
||||||
state_at_previous_sync = yield self.get_state_at_previous_sync(
|
state_at_previous_sync = yield self.get_state_at_previous_sync(
|
||||||
room_id, since_token=since_token
|
room_id, since_token=since_token
|
||||||
|
@ -431,6 +433,7 @@ class SyncHandler(BaseHandler):
|
||||||
joined = True
|
joined = True
|
||||||
|
|
||||||
if joined:
|
if joined:
|
||||||
state_delta = yield self.state_handler.get_current_state(room_id)
|
res = yield self.state_handler.get_current_state(room_id)
|
||||||
|
state_delta = res.values()
|
||||||
|
|
||||||
defer.returnValue(state_delta)
|
defer.returnValue(state_delta)
|
||||||
|
|
|
@ -147,11 +147,14 @@ class Pusher(object):
|
||||||
logger.warn("event_match condition with no pattern")
|
logger.warn("event_match condition with no pattern")
|
||||||
return False
|
return False
|
||||||
# XXX: optimisation: cache our pattern regexps
|
# XXX: optimisation: cache our pattern regexps
|
||||||
|
if condition['key'] == 'content.body':
|
||||||
r = r'\b%s\b' % self._glob_to_regexp(condition['pattern'])
|
r = r'\b%s\b' % self._glob_to_regexp(condition['pattern'])
|
||||||
|
else:
|
||||||
|
r = r'^%s$' % self._glob_to_regexp(condition['pattern'])
|
||||||
val = _value_for_dotted_key(condition['key'], ev)
|
val = _value_for_dotted_key(condition['key'], ev)
|
||||||
if val is None:
|
if val is None:
|
||||||
return False
|
return False
|
||||||
return re.match(r, val, flags=re.IGNORECASE) != None
|
return re.search(r, val, flags=re.IGNORECASE) is not None
|
||||||
|
|
||||||
elif condition['kind'] == 'device':
|
elif condition['kind'] == 'device':
|
||||||
if 'profile_tag' not in condition:
|
if 'profile_tag' not in condition:
|
||||||
|
@ -167,8 +170,10 @@ class Pusher(object):
|
||||||
return False
|
return False
|
||||||
if not display_name:
|
if not display_name:
|
||||||
return False
|
return False
|
||||||
return re.match("\b%s\b" % re.escape(display_name),
|
return re.search(
|
||||||
ev['content']['body'], flags=re.IGNORECASE) != None
|
"\b%s\b" % re.escape(display_name), ev['content']['body'],
|
||||||
|
flags=re.IGNORECASE
|
||||||
|
) is not None
|
||||||
|
|
||||||
elif condition['kind'] == 'room_member_count':
|
elif condition['kind'] == 'room_member_count':
|
||||||
if 'is' not in condition:
|
if 'is' not in condition:
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
from synapse.push.rulekinds import PRIORITY_CLASS_MAP, PRIORITY_CLASS_INVERSE_MAP
|
from synapse.push.rulekinds import PRIORITY_CLASS_MAP, PRIORITY_CLASS_INVERSE_MAP
|
||||||
|
|
||||||
|
|
||||||
def list_with_base_rules(rawrules, user_name):
|
def list_with_base_rules(rawrules, user_name):
|
||||||
ruleslist = []
|
ruleslist = []
|
||||||
|
|
||||||
|
@ -10,8 +11,8 @@ def list_with_base_rules(rawrules, user_name):
|
||||||
while r['priority_class'] < current_prio_class:
|
while r['priority_class'] < current_prio_class:
|
||||||
ruleslist.extend(make_base_rules(
|
ruleslist.extend(make_base_rules(
|
||||||
user_name,
|
user_name,
|
||||||
PRIORITY_CLASS_INVERSE_MAP[current_prio_class])
|
PRIORITY_CLASS_INVERSE_MAP[current_prio_class]
|
||||||
)
|
))
|
||||||
current_prio_class -= 1
|
current_prio_class -= 1
|
||||||
|
|
||||||
ruleslist.append(r)
|
ruleslist.append(r)
|
||||||
|
@ -19,8 +20,8 @@ def list_with_base_rules(rawrules, user_name):
|
||||||
while current_prio_class > 0:
|
while current_prio_class > 0:
|
||||||
ruleslist.extend(make_base_rules(
|
ruleslist.extend(make_base_rules(
|
||||||
user_name,
|
user_name,
|
||||||
PRIORITY_CLASS_INVERSE_MAP[current_prio_class])
|
PRIORITY_CLASS_INVERSE_MAP[current_prio_class]
|
||||||
)
|
))
|
||||||
current_prio_class -= 1
|
current_prio_class -= 1
|
||||||
|
|
||||||
return ruleslist
|
return ruleslist
|
||||||
|
|
|
@ -4,5 +4,5 @@ PRIORITY_CLASS_MAP = {
|
||||||
'room': 3,
|
'room': 3,
|
||||||
'content': 4,
|
'content': 4,
|
||||||
'override': 5,
|
'override': 5,
|
||||||
}
|
}
|
||||||
PRIORITY_CLASS_INVERSE_MAP = {v: k for k, v in PRIORITY_CLASS_MAP.items()}
|
PRIORITY_CLASS_INVERSE_MAP = {v: k for k, v in PRIORITY_CLASS_MAP.items()}
|
||||||
|
|
|
@ -5,7 +5,7 @@ logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
REQUIREMENTS = {
|
REQUIREMENTS = {
|
||||||
"syutil==0.0.2": ["syutil"],
|
"syutil==0.0.2": ["syutil"],
|
||||||
"matrix_angular_sdk==0.6.0": ["syweb>=0.6.0"],
|
"matrix_angular_sdk>=0.6.1": ["syweb>=0.6.1"],
|
||||||
"Twisted==14.0.2": ["twisted==14.0.2"],
|
"Twisted==14.0.2": ["twisted==14.0.2"],
|
||||||
"service_identity>=1.0.0": ["service_identity>=1.0.0"],
|
"service_identity>=1.0.0": ["service_identity>=1.0.0"],
|
||||||
"pyopenssl>=0.14": ["OpenSSL>=0.14"],
|
"pyopenssl>=0.14": ["OpenSSL>=0.14"],
|
||||||
|
@ -19,10 +19,11 @@ REQUIREMENTS = {
|
||||||
"pydenticon": ["pydenticon"],
|
"pydenticon": ["pydenticon"],
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
def github_link(project, version, egg):
|
def github_link(project, version, egg):
|
||||||
return "https://github.com/%s/tarball/%s/#egg=%s" % (project, version, egg)
|
return "https://github.com/%s/tarball/%s/#egg=%s" % (project, version, egg)
|
||||||
|
|
||||||
DEPENDENCY_LINKS=[
|
DEPENDENCY_LINKS = [
|
||||||
github_link(
|
github_link(
|
||||||
project="matrix-org/syutil",
|
project="matrix-org/syutil",
|
||||||
version="v0.0.2",
|
version="v0.0.2",
|
||||||
|
@ -30,8 +31,8 @@ DEPENDENCY_LINKS=[
|
||||||
),
|
),
|
||||||
github_link(
|
github_link(
|
||||||
project="matrix-org/matrix-angular-sdk",
|
project="matrix-org/matrix-angular-sdk",
|
||||||
version="v0.6.0",
|
version="v0.6.1",
|
||||||
egg="matrix_angular_sdk-0.6.0",
|
egg="matrix_angular_sdk-0.6.1",
|
||||||
),
|
),
|
||||||
github_link(
|
github_link(
|
||||||
project="pyca/pynacl",
|
project="pyca/pynacl",
|
||||||
|
@ -101,6 +102,7 @@ def check_requirements():
|
||||||
% (dependency, file_path, version, required_version)
|
% (dependency, file_path, version, required_version)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def list_requirements():
|
def list_requirements():
|
||||||
result = []
|
result = []
|
||||||
linked = []
|
linked = []
|
||||||
|
@ -111,7 +113,7 @@ def list_requirements():
|
||||||
for requirement in REQUIREMENTS:
|
for requirement in REQUIREMENTS:
|
||||||
is_linked = False
|
is_linked = False
|
||||||
for link in linked:
|
for link in linked:
|
||||||
if requirement.replace('-','_').startswith(link):
|
if requirement.replace('-', '_').startswith(link):
|
||||||
is_linked = True
|
is_linked = True
|
||||||
if not is_linked:
|
if not is_linked:
|
||||||
result.append(requirement)
|
result.append(requirement)
|
||||||
|
|
|
@ -46,7 +46,7 @@ class ClientDirectoryServer(ClientV1RestServlet):
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def on_PUT(self, request, room_alias):
|
def on_PUT(self, request, room_alias):
|
||||||
content = _parse_json(request)
|
content = _parse_json(request)
|
||||||
if not "room_id" in content:
|
if "room_id" not in content:
|
||||||
raise SynapseError(400, "Missing room_id key",
|
raise SynapseError(400, "Missing room_id key",
|
||||||
errcode=Codes.BAD_JSON)
|
errcode=Codes.BAD_JSON)
|
||||||
|
|
||||||
|
|
|
@ -15,12 +15,17 @@
|
||||||
|
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
|
|
||||||
from synapse.api.errors import SynapseError, Codes, UnrecognizedRequestError, NotFoundError, \
|
from synapse.api.errors import (
|
||||||
StoreError
|
SynapseError, Codes, UnrecognizedRequestError, NotFoundError, StoreError
|
||||||
|
)
|
||||||
from .base import ClientV1RestServlet, client_path_pattern
|
from .base import ClientV1RestServlet, client_path_pattern
|
||||||
from synapse.storage.push_rule import InconsistentRuleException, RuleNotFoundException
|
from synapse.storage.push_rule import (
|
||||||
|
InconsistentRuleException, RuleNotFoundException
|
||||||
|
)
|
||||||
import synapse.push.baserules as baserules
|
import synapse.push.baserules as baserules
|
||||||
from synapse.push.rulekinds import PRIORITY_CLASS_MAP, PRIORITY_CLASS_INVERSE_MAP
|
from synapse.push.rulekinds import (
|
||||||
|
PRIORITY_CLASS_MAP, PRIORITY_CLASS_INVERSE_MAP
|
||||||
|
)
|
||||||
|
|
||||||
import json
|
import json
|
||||||
|
|
||||||
|
@ -105,7 +110,9 @@ class PushRuleRestServlet(ClientV1RestServlet):
|
||||||
# we build up the full structure and then decide which bits of it
|
# we build up the full structure and then decide which bits of it
|
||||||
# to send which means doing unnecessary work sometimes but is
|
# to send which means doing unnecessary work sometimes but is
|
||||||
# is probably not going to make a whole lot of difference
|
# is probably not going to make a whole lot of difference
|
||||||
rawrules = yield self.hs.get_datastore().get_push_rules_for_user_name(user.to_string())
|
rawrules = yield self.hs.get_datastore().get_push_rules_for_user_name(
|
||||||
|
user.to_string()
|
||||||
|
)
|
||||||
|
|
||||||
for r in rawrules:
|
for r in rawrules:
|
||||||
r["conditions"] = json.loads(r["conditions"])
|
r["conditions"] = json.loads(r["conditions"])
|
||||||
|
@ -383,6 +390,7 @@ def _namespaced_rule_id_from_spec(spec):
|
||||||
def _rule_id_from_namespaced(in_rule_id):
|
def _rule_id_from_namespaced(in_rule_id):
|
||||||
return in_rule_id.split('/')[-1]
|
return in_rule_id.split('/')[-1]
|
||||||
|
|
||||||
|
|
||||||
class InvalidRuleException(Exception):
|
class InvalidRuleException(Exception):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
|
@ -118,7 +118,7 @@ class SyncRestServlet(RestServlet):
|
||||||
except:
|
except:
|
||||||
filter = Filter({})
|
filter = Filter({})
|
||||||
# filter = filter.apply_overrides(http_request)
|
# filter = filter.apply_overrides(http_request)
|
||||||
#if filter.matches(event):
|
# if filter.matches(event):
|
||||||
# # stuff
|
# # stuff
|
||||||
|
|
||||||
sync_config = SyncConfig(
|
sync_config = SyncConfig(
|
||||||
|
|
|
@ -40,7 +40,8 @@ class UploadResource(BaseMediaResource):
|
||||||
return NOT_DONE_YET
|
return NOT_DONE_YET
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def create_content(self, media_type, upload_name, content, content_length, auth_user):
|
def create_content(self, media_type, upload_name, content, content_length,
|
||||||
|
auth_user):
|
||||||
media_id = random_string(24)
|
media_id = random_string(24)
|
||||||
|
|
||||||
fname = self.filepaths.local_media_filepath(media_id)
|
fname = self.filepaths.local_media_filepath(media_id)
|
||||||
|
@ -95,7 +96,7 @@ class UploadResource(BaseMediaResource):
|
||||||
code=400,
|
code=400,
|
||||||
)
|
)
|
||||||
|
|
||||||
#if headers.hasHeader("Content-Disposition"):
|
# if headers.hasHeader("Content-Disposition"):
|
||||||
# disposition = headers.getRawHeaders("Content-Disposition")[0]
|
# disposition = headers.getRawHeaders("Content-Disposition")[0]
|
||||||
# TODO(markjh): parse content-dispostion
|
# TODO(markjh): parse content-dispostion
|
||||||
|
|
||||||
|
|
|
@ -76,7 +76,7 @@ class StateHandler(object):
|
||||||
defer.returnValue(res[1].get((event_type, state_key)))
|
defer.returnValue(res[1].get((event_type, state_key)))
|
||||||
return
|
return
|
||||||
|
|
||||||
defer.returnValue(res[1].values())
|
defer.returnValue(res[1])
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def compute_event_context(self, event, old_state=None):
|
def compute_event_context(self, event, old_state=None):
|
||||||
|
|
|
@ -77,6 +77,43 @@ class LoggingTransaction(object):
|
||||||
sql_logger.debug("[SQL time] {%s} %f", self.name, end - start)
|
sql_logger.debug("[SQL time] {%s} %f", self.name, end - start)
|
||||||
|
|
||||||
|
|
||||||
|
class PerformanceCounters(object):
|
||||||
|
def __init__(self):
|
||||||
|
self.current_counters = {}
|
||||||
|
self.previous_counters = {}
|
||||||
|
|
||||||
|
def update(self, key, start_time, end_time=None):
|
||||||
|
if end_time is None:
|
||||||
|
end_time = time.time() * 1000
|
||||||
|
duration = end_time - start_time
|
||||||
|
count, cum_time = self.current_counters.get(key, (0, 0))
|
||||||
|
count += 1
|
||||||
|
cum_time += duration
|
||||||
|
self.current_counters[key] = (count, cum_time)
|
||||||
|
return end_time
|
||||||
|
|
||||||
|
def interval(self, interval_duration, limit=3):
|
||||||
|
counters = []
|
||||||
|
for name, (count, cum_time) in self.current_counters.items():
|
||||||
|
prev_count, prev_time = self.previous_counters.get(name, (0, 0))
|
||||||
|
counters.append((
|
||||||
|
(cum_time - prev_time) / interval_duration,
|
||||||
|
count - prev_count,
|
||||||
|
name
|
||||||
|
))
|
||||||
|
|
||||||
|
self.previous_counters = dict(self.current_counters)
|
||||||
|
|
||||||
|
counters.sort(reverse=True)
|
||||||
|
|
||||||
|
top_n_counters = ", ".join(
|
||||||
|
"%s(%d): %.3f%%" % (name, count, 100 * ratio)
|
||||||
|
for ratio, count, name in counters[:limit]
|
||||||
|
)
|
||||||
|
|
||||||
|
return top_n_counters
|
||||||
|
|
||||||
|
|
||||||
class SQLBaseStore(object):
|
class SQLBaseStore(object):
|
||||||
_TXN_ID = 0
|
_TXN_ID = 0
|
||||||
|
|
||||||
|
@ -88,6 +125,8 @@ class SQLBaseStore(object):
|
||||||
self._previous_txn_total_time = 0
|
self._previous_txn_total_time = 0
|
||||||
self._current_txn_total_time = 0
|
self._current_txn_total_time = 0
|
||||||
self._previous_loop_ts = 0
|
self._previous_loop_ts = 0
|
||||||
|
self._txn_perf_counters = PerformanceCounters()
|
||||||
|
self._get_event_counters = PerformanceCounters()
|
||||||
|
|
||||||
def start_profiling(self):
|
def start_profiling(self):
|
||||||
self._previous_loop_ts = self._clock.time_msec()
|
self._previous_loop_ts = self._clock.time_msec()
|
||||||
|
@ -103,7 +142,18 @@ class SQLBaseStore(object):
|
||||||
|
|
||||||
ratio = (curr - prev)/(time_now - time_then)
|
ratio = (curr - prev)/(time_now - time_then)
|
||||||
|
|
||||||
logger.info("Total database time: %.3f%%", ratio * 100)
|
top_three_counters = self._txn_perf_counters.interval(
|
||||||
|
time_now - time_then, limit=3
|
||||||
|
)
|
||||||
|
|
||||||
|
top_3_event_counters = self._get_event_counters.interval(
|
||||||
|
time_now - time_then, limit=3
|
||||||
|
)
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
"Total database time: %.3f%% {%s} {%s}",
|
||||||
|
ratio * 100, top_three_counters, top_3_event_counters
|
||||||
|
)
|
||||||
|
|
||||||
self._clock.looping_call(loop, 10000)
|
self._clock.looping_call(loop, 10000)
|
||||||
|
|
||||||
|
@ -116,7 +166,7 @@ class SQLBaseStore(object):
|
||||||
with LoggingContext("runInteraction") as context:
|
with LoggingContext("runInteraction") as context:
|
||||||
current_context.copy_to(context)
|
current_context.copy_to(context)
|
||||||
start = time.time() * 1000
|
start = time.time() * 1000
|
||||||
txn_id = SQLBaseStore._TXN_ID
|
txn_id = self._TXN_ID
|
||||||
|
|
||||||
# We don't really need these to be unique, so lets stop it from
|
# We don't really need these to be unique, so lets stop it from
|
||||||
# growing really large.
|
# growing really large.
|
||||||
|
@ -138,6 +188,7 @@ class SQLBaseStore(object):
|
||||||
)
|
)
|
||||||
|
|
||||||
self._current_txn_total_time += end - start
|
self._current_txn_total_time += end - start
|
||||||
|
self._txn_perf_counters.update(desc, start, end)
|
||||||
|
|
||||||
with PreserveLoggingContext():
|
with PreserveLoggingContext():
|
||||||
result = yield self._db_pool.runInteraction(
|
result = yield self._db_pool.runInteraction(
|
||||||
|
@ -537,6 +588,8 @@ class SQLBaseStore(object):
|
||||||
"LIMIT 1 "
|
"LIMIT 1 "
|
||||||
)
|
)
|
||||||
|
|
||||||
|
start_time = time.time() * 1000
|
||||||
|
|
||||||
txn.execute(sql, (event_id,))
|
txn.execute(sql, (event_id,))
|
||||||
|
|
||||||
res = txn.fetchone()
|
res = txn.fetchone()
|
||||||
|
@ -546,6 +599,8 @@ class SQLBaseStore(object):
|
||||||
|
|
||||||
internal_metadata, js, redacted, rejected_reason = res
|
internal_metadata, js, redacted, rejected_reason = res
|
||||||
|
|
||||||
|
self._get_event_counters.update("select_event", start_time)
|
||||||
|
|
||||||
if allow_rejected or not rejected_reason:
|
if allow_rejected or not rejected_reason:
|
||||||
return self._get_event_from_row_txn(
|
return self._get_event_from_row_txn(
|
||||||
txn, internal_metadata, js, redacted,
|
txn, internal_metadata, js, redacted,
|
||||||
|
@ -557,10 +612,18 @@ class SQLBaseStore(object):
|
||||||
|
|
||||||
def _get_event_from_row_txn(self, txn, internal_metadata, js, redacted,
|
def _get_event_from_row_txn(self, txn, internal_metadata, js, redacted,
|
||||||
check_redacted=True, get_prev_content=False):
|
check_redacted=True, get_prev_content=False):
|
||||||
|
|
||||||
|
start_time = time.time() * 1000
|
||||||
|
update_counter = self._get_event_counters.update
|
||||||
|
|
||||||
d = json.loads(js)
|
d = json.loads(js)
|
||||||
|
start_time = update_counter("decode_json", start_time)
|
||||||
|
|
||||||
internal_metadata = json.loads(internal_metadata)
|
internal_metadata = json.loads(internal_metadata)
|
||||||
|
start_time = update_counter("decode_internal", start_time)
|
||||||
|
|
||||||
ev = FrozenEvent(d, internal_metadata_dict=internal_metadata)
|
ev = FrozenEvent(d, internal_metadata_dict=internal_metadata)
|
||||||
|
start_time = update_counter("build_frozen_event", start_time)
|
||||||
|
|
||||||
if check_redacted and redacted:
|
if check_redacted and redacted:
|
||||||
ev = prune_event(ev)
|
ev = prune_event(ev)
|
||||||
|
@ -576,6 +639,7 @@ class SQLBaseStore(object):
|
||||||
|
|
||||||
if because:
|
if because:
|
||||||
ev.unsigned["redacted_because"] = because
|
ev.unsigned["redacted_because"] = because
|
||||||
|
start_time = update_counter("redact_event", start_time)
|
||||||
|
|
||||||
if get_prev_content and "replaces_state" in ev.unsigned:
|
if get_prev_content and "replaces_state" in ev.unsigned:
|
||||||
prev = self._get_event_txn(
|
prev = self._get_event_txn(
|
||||||
|
@ -585,6 +649,7 @@ class SQLBaseStore(object):
|
||||||
)
|
)
|
||||||
if prev:
|
if prev:
|
||||||
ev.unsigned["prev_content"] = prev.get_dict()["content"]
|
ev.unsigned["prev_content"] = prev.get_dict()["content"]
|
||||||
|
start_time = update_counter("get_prev_content", start_time)
|
||||||
|
|
||||||
return ev
|
return ev
|
||||||
|
|
||||||
|
|
|
@ -91,7 +91,9 @@ class PushRuleStore(SQLBaseStore):
|
||||||
txn.execute(sql, (user_name, relative_to_rule))
|
txn.execute(sql, (user_name, relative_to_rule))
|
||||||
res = txn.fetchall()
|
res = txn.fetchall()
|
||||||
if not res:
|
if not res:
|
||||||
raise RuleNotFoundException("before/after rule not found: %s" % (relative_to_rule))
|
raise RuleNotFoundException(
|
||||||
|
"before/after rule not found: %s" % (relative_to_rule,)
|
||||||
|
)
|
||||||
priority_class, base_rule_priority = res[0]
|
priority_class, base_rule_priority = res[0]
|
||||||
|
|
||||||
if 'priority_class' in kwargs and kwargs['priority_class'] != priority_class:
|
if 'priority_class' in kwargs and kwargs['priority_class'] != priority_class:
|
||||||
|
|
Loading…
Reference in a new issue