Start creating a module to do generic notifications (just prints them to stdout currently!)

This commit is contained in:
David Baker 2014-11-19 18:20:59 +00:00
parent 493055731e
commit 74c3879760
12 changed files with 449 additions and 3 deletions

View file

@ -32,6 +32,7 @@ class Codes(object):
LIMIT_EXCEEDED = "M_LIMIT_EXCEEDED" LIMIT_EXCEEDED = "M_LIMIT_EXCEEDED"
CAPTCHA_NEEDED = "M_CAPTCHA_NEEDED" CAPTCHA_NEEDED = "M_CAPTCHA_NEEDED"
CAPTCHA_INVALID = "M_CAPTCHA_INVALID" CAPTCHA_INVALID = "M_CAPTCHA_INVALID"
MISSING_PARAM = "M_MISSING_PARAM"
class CodeMessageException(Exception): class CodeMessageException(Exception):

View file

@ -242,6 +242,8 @@ def setup():
bind_port = None bind_port = None
hs.start_listening(bind_port, config.unsecure_port) hs.start_listening(bind_port, config.unsecure_port)
hs.get_pusherpool().start()
if config.daemonize: if config.daemonize:
print config.pid_file print config.pid_file
daemon = Daemonize( daemon = Daemonize(

76
synapse/push/__init__.py Normal file
View file

@ -0,0 +1,76 @@
# -*- coding: utf-8 -*-
# Copyright 2014 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.internet import defer
from synapse.streams.config import PaginationConfig
from synapse.types import StreamToken
import synapse.util.async
import logging
logger = logging.getLogger(__name__)
class Pusher(object):
INITIAL_BACKOFF = 1000
MAX_BACKOFF = 10 * 60 * 1000
def __init__(self, _hs, user_name, app, app_display_name, device_display_name, pushkey, data, last_token):
self.hs = _hs
self.evStreamHandler = self.hs.get_handlers().event_stream_handler
self.store = self.hs.get_datastore()
self.user_name = user_name
self.app = app
self.app_display_name = app_display_name
self.device_display_name = device_display_name
self.pushkey = pushkey
self.data = data
self.last_token = last_token
self.backoff_delay = Pusher.INITIAL_BACKOFF
@defer.inlineCallbacks
def start(self):
if not self.last_token:
# First-time setup: get a token to start from (we can't just start from no token, ie. 'now'
# because we need the result to be reproduceable in case we fail to dispatch the push)
config = PaginationConfig(from_token=None, limit='1')
chunk = yield self.evStreamHandler.get_stream(self.user_name, config, timeout=0)
self.last_token = chunk['end']
self.store.update_pusher_last_token(self.user_name, self.pushkey, self.last_token)
logger.info("Pusher %s for user %s starting from token %s",
self.pushkey, self.user_name, self.last_token)
while True:
from_tok = StreamToken.from_string(self.last_token)
config = PaginationConfig(from_token=from_tok, limit='1')
chunk = yield self.evStreamHandler.get_stream(self.user_name, config, timeout=100*365*24*60*60*1000)
if (self.dispatchPush(chunk['chunk'][0])):
self.backoff_delay = Pusher.INITIAL_BACKOFF
self.last_token = chunk['end']
self.store.update_pusher_last_token(self.user_name, self.pushkey, self.last_token)
else:
logger.warn("Failed to dispatch push for user %s. Trying again in %dms",
self.user_name, self.backoff_delay)
yield synapse.util.async.sleep(self.backoff_delay / 1000.0)
self.backoff_delay *=2
if self.backoff_delay > Pusher.MAX_BACKOFF:
self.backoff_delay = Pusher.MAX_BACKOFF
class PusherConfigException(Exception):
def __init__(self, msg):
super(PusherConfigException, self).__init__(msg)

View file

@ -0,0 +1,40 @@
# -*- coding: utf-8 -*-
# Copyright 2014 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.push import Pusher, PusherConfigException
from synapse.http.client import
import logging
logger = logging.getLogger(__name__)
class HttpPusher(Pusher):
def __init__(self, _hs, user_name, app, app_display_name, device_display_name, pushkey, data, last_token):
super(HttpPusher, self).__init__(_hs,
user_name,
app,
app_display_name,
device_display_name,
pushkey,
data,
last_token)
if 'url' not in data:
raise PusherConfigException("'url' required in data for HTTP pusher")
self.url = data['url']
def dispatchPush(self, event):
print event
return True

View file

@ -0,0 +1,94 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 2014 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.internet import defer
from httppusher import HttpPusher
from synapse.push import PusherConfigException
import logging
import json
logger = logging.getLogger(__name__)
class PusherPool:
def __init__(self, _hs):
self.hs = _hs
self.store = self.hs.get_datastore()
self.pushers = []
self.last_pusher_started = -1
def start(self):
self._pushers_added()
def add_pusher(self, user_name, kind, app, app_display_name, device_display_name, pushkey, data):
# we try to create the pusher just to validate the config: it will then get pulled out of the database,
# recreated, added and started: this means we have only one code path adding pushers.
self._create_pusher({
"user_name": user_name,
"kind": kind,
"app": app,
"app_display_name": app_display_name,
"device_display_name": device_display_name,
"pushkey": pushkey,
"data": data,
"last_token": None
})
self._add_pusher_to_store(user_name, kind, app, app_display_name, device_display_name, pushkey, data)
@defer.inlineCallbacks
def _add_pusher_to_store(self, user_name, kind, app, app_display_name, device_display_name, pushkey, data):
yield self.store.add_pusher(user_name=user_name,
kind=kind,
app=app,
app_display_name=app_display_name,
device_display_name=device_display_name,
pushkey=pushkey,
data=json.dumps(data))
self._pushers_added()
def _create_pusher(self, pusherdict):
if pusherdict['kind'] == 'http':
return HttpPusher(self.hs,
user_name=pusherdict['user_name'],
app=pusherdict['app'],
app_display_name=pusherdict['app_display_name'],
device_display_name=pusherdict['device_display_name'],
pushkey=pusherdict['pushkey'],
data=pusherdict['data'],
last_token=pusherdict['last_token']
)
else:
raise PusherConfigException("Unknown pusher type '%s' for user %s" %
(pusherdict['kind'], pusherdict['user_name']))
@defer.inlineCallbacks
def _pushers_added(self):
pushers = yield self.store.get_all_pushers_after_id(self.last_pusher_started)
for p in pushers:
p['data'] = json.loads(p['data'])
if (len(pushers)):
self.last_pusher_started = pushers[-1]['id']
self._start_pushers(pushers)
def _start_pushers(self, pushers):
logger.info("Starting %d pushers", (len(pushers)))
for pusherdict in pushers:
p = self._create_pusher(pusherdict)
if p:
self.pushers.append(p)
p.start()

View file

@ -16,7 +16,7 @@
from . import ( from . import (
room, events, register, login, profile, presence, initial_sync, directory, room, events, register, login, profile, presence, initial_sync, directory,
voip, admin, voip, admin, pusher,
) )
@ -45,3 +45,4 @@ class RestServletFactory(object):
directory.register_servlets(hs, client_resource) directory.register_servlets(hs, client_resource)
voip.register_servlets(hs, client_resource) voip.register_servlets(hs, client_resource)
admin.register_servlets(hs, client_resource) admin.register_servlets(hs, client_resource)
pusher.register_servlets(hs, client_resource)

71
synapse/rest/pusher.py Normal file
View file

@ -0,0 +1,71 @@
# -*- coding: utf-8 -*-
# Copyright 2014 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.internet import defer
from synapse.api.errors import SynapseError, Codes
from synapse.push import PusherConfigException
from base import RestServlet, client_path_pattern
import json
class PusherRestServlet(RestServlet):
PATTERN = client_path_pattern("/pushers/(?P<pushkey>[\w]*)$")
@defer.inlineCallbacks
def on_PUT(self, request, pushkey):
user = yield self.auth.get_user_by_req(request)
content = _parse_json(request)
reqd = ['kind', 'app', 'app_display_name', 'device_display_name', 'data']
missing = []
for i in reqd:
if i not in content:
missing.append(i)
if len(missing):
raise SynapseError(400, "Missing parameters: "+','.join(missing), errcode=Codes.MISSING_PARAM)
pusher_pool = self.hs.get_pusherpool()
try:
pusher_pool.add_pusher(user_name=user.to_string(),
kind=content['kind'],
app=content['app'],
app_display_name=content['app_display_name'],
device_display_name=content['device_display_name'],
pushkey=pushkey,
data=content['data'])
except PusherConfigException as pce:
raise SynapseError(400, "Config Error: "+pce.message, errcode=Codes.MISSING_PARAM)
defer.returnValue((200, {}))
def on_OPTIONS(self, request):
return (200, {})
# XXX: C+ped from rest/room.py - surely this should be common?
def _parse_json(request):
try:
content = json.loads(request.content.read())
if type(content) != dict:
raise SynapseError(400, "Content must be a JSON object.",
errcode=Codes.NOT_JSON)
return content
except ValueError:
raise SynapseError(400, "Content not JSON.", errcode=Codes.NOT_JSON)
def register_servlets(hs, http_server):
PusherRestServlet(hs).register(http_server)

View file

@ -36,6 +36,7 @@ from synapse.util.lockutils import LockManager
from synapse.streams.events import EventSources from synapse.streams.events import EventSources
from synapse.api.ratelimiting import Ratelimiter from synapse.api.ratelimiting import Ratelimiter
from synapse.crypto.keyring import Keyring from synapse.crypto.keyring import Keyring
from synapse.push.pusherpool import PusherPool
class BaseHomeServer(object): class BaseHomeServer(object):
@ -82,6 +83,7 @@ class BaseHomeServer(object):
'ratelimiter', 'ratelimiter',
'keyring', 'keyring',
'event_validator', 'event_validator',
'pusherpool'
] ]
def __init__(self, hostname, **kwargs): def __init__(self, hostname, **kwargs):
@ -228,6 +230,9 @@ class HomeServer(BaseHomeServer):
def build_event_validator(self): def build_event_validator(self):
return EventValidator(self) return EventValidator(self)
def build_pusherpool(self):
return PusherPool(self)
def register_servlets(self): def register_servlets(self):
""" Register all servlets associated with this HomeServer. """ Register all servlets associated with this HomeServer.
""" """

View file

@ -33,6 +33,7 @@ from .stream import StreamStore
from .transactions import TransactionStore from .transactions import TransactionStore
from .keys import KeyStore from .keys import KeyStore
from .event_federation import EventFederationStore from .event_federation import EventFederationStore
from .pusher import PusherStore
from .state import StateStore from .state import StateStore
from .signatures import SignatureStore from .signatures import SignatureStore
@ -62,12 +63,13 @@ SCHEMAS = [
"state", "state",
"event_edges", "event_edges",
"event_signatures", "event_signatures",
"pusher"
] ]
# Remember to update this number every time an incompatible change is made to # Remember to update this number every time an incompatible change is made to
# database schema files, so the users will be informed on server restarts. # database schema files, so the users will be informed on server restarts.
SCHEMA_VERSION = 6 SCHEMA_VERSION = 7
class _RollbackButIsFineException(Exception): class _RollbackButIsFineException(Exception):
@ -81,7 +83,7 @@ class DataStore(RoomMemberStore, RoomStore,
RegistrationStore, StreamStore, ProfileStore, FeedbackStore, RegistrationStore, StreamStore, ProfileStore, FeedbackStore,
PresenceStore, TransactionStore, PresenceStore, TransactionStore,
DirectoryStore, KeyStore, StateStore, SignatureStore, DirectoryStore, KeyStore, StateStore, SignatureStore,
EventFederationStore, ): EventFederationStore, PusherStore, ):
def __init__(self, hs): def __init__(self, hs):
super(DataStore, self).__init__(hs) super(DataStore, self).__init__(hs)

98
synapse/storage/pusher.py Normal file
View file

@ -0,0 +1,98 @@
# -*- coding: utf-8 -*-
# Copyright 2014 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
from ._base import SQLBaseStore, Table
from twisted.internet import defer
from sqlite3 import IntegrityError
from synapse.api.errors import StoreError
import logging
logger = logging.getLogger(__name__)
class PusherStore(SQLBaseStore):
@defer.inlineCallbacks
def get_all_pushers_after_id(self, min_id):
sql = (
"SELECT id, user_name, kind, app, app_display_name, device_display_name, pushkey, data, last_token "
"FROM pushers "
"WHERE id > ?"
)
rows = yield self._execute(None, sql, min_id)
ret = [
{
"id": r[0],
"user_name": r[1],
"kind": r[2],
"app": r[3],
"app_display_name": r[4],
"device_display_name": r[5],
"pushkey": r[6],
"data": r[7],
"last_token": r[8]
}
for r in rows
]
defer.returnValue(ret)
@defer.inlineCallbacks
def add_pusher(self, user_name, kind, app, app_display_name, device_display_name, pushkey, data):
try:
yield self._simple_insert(PushersTable.table_name, dict(
user_name=user_name,
kind=kind,
app=app,
app_display_name=app_display_name,
device_display_name=device_display_name,
pushkey=pushkey,
data=data
))
except IntegrityError:
raise StoreError(409, "Pushkey in use.")
except Exception as e:
logger.error("create_pusher with failed: %s", e)
raise StoreError(500, "Problem creating pusher.")
@defer.inlineCallbacks
def update_pusher_last_token(self, user_name, pushkey, last_token):
yield self._simple_update_one(PushersTable.table_name,
{'user_name': user_name, 'pushkey': pushkey},
{'last_token': last_token}
)
class PushersTable(Table):
table_name = "pushers"
fields = [
"id",
"user_name",
"kind",
"app"
"app_display_name",
"device_display_name",
"pushkey",
"data",
"last_token"
]
EntryType = collections.namedtuple("PusherEntry", fields)

View file

@ -0,0 +1,28 @@
/* Copyright 2014 OpenMarket Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- Push notification endpoints that users have configured
CREATE TABLE IF NOT EXISTS pushers (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_name TEXT NOT NULL,
kind varchar(8) NOT NULL,
app varchar(64) NOT NULL,
app_display_name varchar(64) NOT NULL,
device_display_name varchar(128) NOT NULL,
pushkey blob NOT NULL,
data text,
last_token TEXT,
FOREIGN KEY(user_name) REFERENCES users(name),
UNIQUE (user_name, pushkey)
);

View file

@ -0,0 +1,28 @@
/* Copyright 2014 OpenMarket Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- Push notification endpoints that users have configured
CREATE TABLE IF NOT EXISTS pushers (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_name TEXT NOT NULL,
kind varchar(8) NOT NULL,
app varchar(64) NOT NULL,
app_display_name varchar(64) NOT NULL,
device_display_name varchar(128) NOT NULL,
pushkey blob NOT NULL,
data text,
last_token TEXT,
FOREIGN KEY(user_name) REFERENCES users(name),
UNIQUE (user_name, pushkey)
);