Send some ephemeral events to appservices (#8437)

Optionally sends typing, presence, and read receipt information to appservices.
This commit is contained in:
Will Hunt 2020-10-15 17:33:28 +01:00 committed by GitHub
parent 654e239b25
commit c276bd9969
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 564 additions and 123 deletions

1
changelog.d/8437.feature Normal file
View file

@ -0,0 +1 @@
Implement [MSC2409](https://github.com/matrix-org/matrix-doc/pull/2409) to send typing, read receipts, and presence events to appservices.

View file

@ -15,6 +15,7 @@ files =
synapse/events/builder.py,
synapse/events/spamcheck.py,
synapse/federation,
synapse/handlers/appservice.py,
synapse/handlers/account_data.py,
synapse/handlers/auth.py,
synapse/handlers/cas_handler.py,

View file

@ -14,14 +14,15 @@
# limitations under the License.
import logging
import re
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Iterable, List, Match, Optional
from synapse.api.constants import EventTypes
from synapse.appservice.api import ApplicationServiceApi
from synapse.types import GroupID, get_domain_from_id
from synapse.events import EventBase
from synapse.types import GroupID, JsonDict, UserID, get_domain_from_id
from synapse.util.caches.descriptors import cached
if TYPE_CHECKING:
from synapse.appservice.api import ApplicationServiceApi
from synapse.storage.databases.main import DataStore
logger = logging.getLogger(__name__)
@ -32,38 +33,6 @@ class ApplicationServiceState:
UP = "up"
class AppServiceTransaction:
"""Represents an application service transaction."""
def __init__(self, service, id, events):
self.service = service
self.id = id
self.events = events
async def send(self, as_api: ApplicationServiceApi) -> bool:
"""Sends this transaction using the provided AS API interface.
Args:
as_api: The API to use to send.
Returns:
True if the transaction was sent.
"""
return await as_api.push_bulk(
service=self.service, events=self.events, txn_id=self.id
)
async def complete(self, store: "DataStore") -> None:
"""Completes this transaction as successful.
Marks this transaction ID on the application service and removes the
transaction contents from the database.
Args:
store: The database store to operate on.
"""
await store.complete_appservice_txn(service=self.service, txn_id=self.id)
class ApplicationService:
"""Defines an application service. This definition is mostly what is
provided to the /register AS API.
@ -91,6 +60,7 @@ class ApplicationService:
protocols=None,
rate_limited=True,
ip_range_whitelist=None,
supports_ephemeral=False,
):
self.token = token
self.url = (
@ -102,6 +72,7 @@ class ApplicationService:
self.namespaces = self._check_namespaces(namespaces)
self.id = id
self.ip_range_whitelist = ip_range_whitelist
self.supports_ephemeral = supports_ephemeral
if "|" in self.id:
raise Exception("application service ID cannot contain '|' character")
@ -161,19 +132,21 @@ class ApplicationService:
raise ValueError("Expected string for 'regex' in ns '%s'" % ns)
return namespaces
def _matches_regex(self, test_string, namespace_key):
def _matches_regex(self, test_string: str, namespace_key: str) -> Optional[Match]:
for regex_obj in self.namespaces[namespace_key]:
if regex_obj["regex"].match(test_string):
return regex_obj
return None
def _is_exclusive(self, ns_key, test_string):
def _is_exclusive(self, ns_key: str, test_string: str) -> bool:
regex_obj = self._matches_regex(test_string, ns_key)
if regex_obj:
return regex_obj["exclusive"]
return False
async def _matches_user(self, event, store):
async def _matches_user(
self, event: Optional[EventBase], store: Optional["DataStore"] = None
) -> bool:
if not event:
return False
@ -188,14 +161,23 @@ class ApplicationService:
if not store:
return False
does_match = await self._matches_user_in_member_list(event.room_id, store)
does_match = await self.matches_user_in_member_list(event.room_id, store)
return does_match
@cached(num_args=1, cache_context=True)
async def _matches_user_in_member_list(self, room_id, store, cache_context):
member_list = await store.get_users_in_room(
room_id, on_invalidate=cache_context.invalidate
)
@cached(num_args=1)
async def matches_user_in_member_list(
self, room_id: str, store: "DataStore"
) -> bool:
"""Check if this service is interested a room based upon it's membership
Args:
room_id: The room to check.
store: The datastore to query.
Returns:
True if this service would like to know about this room.
"""
member_list = await store.get_users_in_room(room_id)
# check joined member events
for user_id in member_list:
@ -203,12 +185,14 @@ class ApplicationService:
return True
return False
def _matches_room_id(self, event):
def _matches_room_id(self, event: EventBase) -> bool:
if hasattr(event, "room_id"):
return self.is_interested_in_room(event.room_id)
return False
async def _matches_aliases(self, event, store):
async def _matches_aliases(
self, event: EventBase, store: Optional["DataStore"] = None
) -> bool:
if not store or not event:
return False
@ -218,12 +202,15 @@ class ApplicationService:
return True
return False
async def is_interested(self, event, store=None) -> bool:
async def is_interested(
self, event: EventBase, store: Optional["DataStore"] = None
) -> bool:
"""Check if this service is interested in this event.
Args:
event(Event): The event to check.
store(DataStore)
event: The event to check.
store: The datastore to query.
Returns:
True if this service would like to know about this event.
"""
@ -231,39 +218,66 @@ class ApplicationService:
if self._matches_room_id(event):
return True
if await self._matches_aliases(event, store):
# This will check the namespaces first before
# checking the store, so should be run before _matches_aliases
if await self._matches_user(event, store):
return True
if await self._matches_user(event, store):
# This will check the store, so should be run last
if await self._matches_aliases(event, store):
return True
return False
def is_interested_in_user(self, user_id):
@cached(num_args=1)
async def is_interested_in_presence(
self, user_id: UserID, store: "DataStore"
) -> bool:
"""Check if this service is interested a user's presence
Args:
user_id: The user to check.
store: The datastore to query.
Returns:
True if this service would like to know about presence for this user.
"""
# Find all the rooms the sender is in
if self.is_interested_in_user(user_id.to_string()):
return True
room_ids = await store.get_rooms_for_user(user_id.to_string())
# Then find out if the appservice is interested in any of those rooms
for room_id in room_ids:
if await self.matches_user_in_member_list(room_id, store):
return True
return False
def is_interested_in_user(self, user_id: str) -> bool:
return (
self._matches_regex(user_id, ApplicationService.NS_USERS)
bool(self._matches_regex(user_id, ApplicationService.NS_USERS))
or user_id == self.sender
)
def is_interested_in_alias(self, alias):
def is_interested_in_alias(self, alias: str) -> bool:
return bool(self._matches_regex(alias, ApplicationService.NS_ALIASES))
def is_interested_in_room(self, room_id):
def is_interested_in_room(self, room_id: str) -> bool:
return bool(self._matches_regex(room_id, ApplicationService.NS_ROOMS))
def is_exclusive_user(self, user_id):
def is_exclusive_user(self, user_id: str) -> bool:
return (
self._is_exclusive(ApplicationService.NS_USERS, user_id)
or user_id == self.sender
)
def is_interested_in_protocol(self, protocol):
def is_interested_in_protocol(self, protocol: str) -> bool:
return protocol in self.protocols
def is_exclusive_alias(self, alias):
def is_exclusive_alias(self, alias: str) -> bool:
return self._is_exclusive(ApplicationService.NS_ALIASES, alias)
def is_exclusive_room(self, room_id):
def is_exclusive_room(self, room_id: str) -> bool:
return self._is_exclusive(ApplicationService.NS_ROOMS, room_id)
def get_exclusive_user_regexes(self):
@ -276,14 +290,14 @@ class ApplicationService:
if regex_obj["exclusive"]
]
def get_groups_for_user(self, user_id):
def get_groups_for_user(self, user_id: str) -> Iterable[str]:
"""Get the groups that this user is associated with by this AS
Args:
user_id (str): The ID of the user.
user_id: The ID of the user.
Returns:
iterable[str]: an iterable that yields group_id strings.
An iterable that yields group_id strings.
"""
return (
regex_obj["group_id"]
@ -291,7 +305,7 @@ class ApplicationService:
if "group_id" in regex_obj and regex_obj["regex"].match(user_id)
)
def is_rate_limited(self):
def is_rate_limited(self) -> bool:
return self.rate_limited
def __str__(self):
@ -300,3 +314,45 @@ class ApplicationService:
dict_copy["token"] = "<redacted>"
dict_copy["hs_token"] = "<redacted>"
return "ApplicationService: %s" % (dict_copy,)
class AppServiceTransaction:
"""Represents an application service transaction."""
def __init__(
self,
service: ApplicationService,
id: int,
events: List[EventBase],
ephemeral: List[JsonDict],
):
self.service = service
self.id = id
self.events = events
self.ephemeral = ephemeral
async def send(self, as_api: "ApplicationServiceApi") -> bool:
"""Sends this transaction using the provided AS API interface.
Args:
as_api: The API to use to send.
Returns:
True if the transaction was sent.
"""
return await as_api.push_bulk(
service=self.service,
events=self.events,
ephemeral=self.ephemeral,
txn_id=self.id,
)
async def complete(self, store: "DataStore") -> None:
"""Completes this transaction as successful.
Marks this transaction ID on the application service and removes the
transaction contents from the database.
Args:
store: The database store to operate on.
"""
await store.complete_appservice_txn(service=self.service, txn_id=self.id)

View file

@ -14,12 +14,13 @@
# limitations under the License.
import logging
import urllib
from typing import TYPE_CHECKING, Optional, Tuple
from typing import TYPE_CHECKING, List, Optional, Tuple
from prometheus_client import Counter
from synapse.api.constants import EventTypes, ThirdPartyEntityKind
from synapse.api.errors import CodeMessageException
from synapse.events import EventBase
from synapse.events.utils import serialize_event
from synapse.http.client import SimpleHttpClient
from synapse.types import JsonDict, ThirdPartyInstanceID
@ -201,7 +202,13 @@ class ApplicationServiceApi(SimpleHttpClient):
key = (service.id, protocol)
return await self.protocol_meta_cache.wrap(key, _get)
async def push_bulk(self, service, events, txn_id=None):
async def push_bulk(
self,
service: "ApplicationService",
events: List[EventBase],
ephemeral: List[JsonDict],
txn_id: Optional[int] = None,
):
if service.url is None:
return True
@ -211,15 +218,19 @@ class ApplicationServiceApi(SimpleHttpClient):
logger.warning(
"push_bulk: Missing txn ID sending events to %s", service.url
)
txn_id = str(0)
txn_id = str(txn_id)
txn_id = 0
uri = service.url + ("/transactions/%s" % urllib.parse.quote(str(txn_id)))
# Never send ephemeral events to appservices that do not support it
if service.supports_ephemeral:
body = {"events": events, "de.sorunome.msc2409.ephemeral": ephemeral}
else:
body = {"events": events}
uri = service.url + ("/transactions/%s" % urllib.parse.quote(txn_id))
try:
await self.put_json(
uri=uri,
json_body={"events": events},
args={"access_token": service.hs_token},
uri=uri, json_body=body, args={"access_token": service.hs_token},
)
sent_transactions_counter.labels(service.id).inc()
sent_events_counter.labels(service.id).inc(len(events))

View file

@ -49,10 +49,13 @@ This is all tied together by the AppServiceScheduler which DIs the required
components.
"""
import logging
from typing import List
from synapse.appservice import ApplicationServiceState
from synapse.appservice import ApplicationService, ApplicationServiceState
from synapse.events import EventBase
from synapse.logging.context import run_in_background
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import JsonDict
logger = logging.getLogger(__name__)
@ -82,8 +85,13 @@ class ApplicationServiceScheduler:
for service in services:
self.txn_ctrl.start_recoverer(service)
def submit_event_for_as(self, service, event):
self.queuer.enqueue(service, event)
def submit_event_for_as(self, service: ApplicationService, event: EventBase):
self.queuer.enqueue_event(service, event)
def submit_ephemeral_events_for_as(
self, service: ApplicationService, events: List[JsonDict]
):
self.queuer.enqueue_ephemeral(service, events)
class _ServiceQueuer:
@ -96,17 +104,15 @@ class _ServiceQueuer:
def __init__(self, txn_ctrl, clock):
self.queued_events = {} # dict of {service_id: [events]}
self.queued_ephemeral = {} # dict of {service_id: [events]}
# the appservices which currently have a transaction in flight
self.requests_in_flight = set()
self.txn_ctrl = txn_ctrl
self.clock = clock
def enqueue(self, service, event):
self.queued_events.setdefault(service.id, []).append(event)
def _start_background_request(self, service):
# start a sender for this appservice if we don't already have one
if service.id in self.requests_in_flight:
return
@ -114,7 +120,15 @@ class _ServiceQueuer:
"as-sender-%s" % (service.id,), self._send_request, service
)
async def _send_request(self, service):
def enqueue_event(self, service: ApplicationService, event: EventBase):
self.queued_events.setdefault(service.id, []).append(event)
self._start_background_request(service)
def enqueue_ephemeral(self, service: ApplicationService, events: List[JsonDict]):
self.queued_ephemeral.setdefault(service.id, []).extend(events)
self._start_background_request(service)
async def _send_request(self, service: ApplicationService):
# sanity-check: we shouldn't get here if this service already has a sender
# running.
assert service.id not in self.requests_in_flight
@ -123,10 +137,11 @@ class _ServiceQueuer:
try:
while True:
events = self.queued_events.pop(service.id, [])
if not events:
ephemeral = self.queued_ephemeral.pop(service.id, [])
if not events and not ephemeral:
return
try:
await self.txn_ctrl.send(service, events)
await self.txn_ctrl.send(service, events, ephemeral)
except Exception:
logger.exception("AS request failed")
finally:
@ -158,9 +173,16 @@ class _TransactionController:
# for UTs
self.RECOVERER_CLASS = _Recoverer
async def send(self, service, events):
async def send(
self,
service: ApplicationService,
events: List[EventBase],
ephemeral: List[JsonDict] = [],
):
try:
txn = await self.store.create_appservice_txn(service=service, events=events)
txn = await self.store.create_appservice_txn(
service=service, events=events, ephemeral=ephemeral
)
service_is_up = await self._is_service_up(service)
if service_is_up:
sent = await txn.send(self.as_api)
@ -204,7 +226,7 @@ class _TransactionController:
recoverer.recover()
logger.info("Now %i active recoverers", len(self.recoverers))
async def _is_service_up(self, service):
async def _is_service_up(self, service: ApplicationService) -> bool:
state = await self.store.get_appservice_state(service)
return state == ApplicationServiceState.UP or state is None

View file

@ -160,6 +160,8 @@ def _load_appservice(hostname, as_info, config_filename):
if as_info.get("ip_range_whitelist"):
ip_range_whitelist = IPSet(as_info.get("ip_range_whitelist"))
supports_ephemeral = as_info.get("de.sorunome.msc2409.push_ephemeral", False)
return ApplicationService(
token=as_info["as_token"],
hostname=hostname,
@ -168,6 +170,7 @@ def _load_appservice(hostname, as_info, config_filename):
hs_token=as_info["hs_token"],
sender=user_id,
id=as_info["id"],
supports_ephemeral=supports_ephemeral,
protocols=protocols,
rate_limited=rate_limited,
ip_range_whitelist=ip_range_whitelist,

View file

@ -14,6 +14,7 @@
# limitations under the License.
import logging
from typing import Dict, List, Optional
from prometheus_client import Counter
@ -21,13 +22,16 @@ from twisted.internet import defer
import synapse
from synapse.api.constants import EventTypes
from synapse.appservice import ApplicationService
from synapse.events import EventBase
from synapse.handlers.presence import format_user_presence_state
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.metrics import (
event_processing_loop_counter,
event_processing_loop_room_count,
)
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import RoomStreamToken
from synapse.types import Collection, JsonDict, RoomStreamToken, UserID
from synapse.util.metrics import Measure
logger = logging.getLogger(__name__)
@ -44,6 +48,7 @@ class ApplicationServicesHandler:
self.started_scheduler = False
self.clock = hs.get_clock()
self.notify_appservices = hs.config.notify_appservices
self.event_sources = hs.get_event_sources()
self.current_max = 0
self.is_processing = False
@ -82,7 +87,7 @@ class ApplicationServicesHandler:
if not events:
break
events_by_room = {}
events_by_room = {} # type: Dict[str, List[EventBase]]
for event in events:
events_by_room.setdefault(event.room_id, []).append(event)
@ -161,6 +166,104 @@ class ApplicationServicesHandler:
finally:
self.is_processing = False
async def notify_interested_services_ephemeral(
self, stream_key: str, new_token: Optional[int], users: Collection[UserID] = [],
):
"""This is called by the notifier in the background
when a ephemeral event handled by the homeserver.
This will determine which appservices
are interested in the event, and submit them.
Events will only be pushed to appservices
that have opted into ephemeral events
Args:
stream_key: The stream the event came from.
new_token: The latest stream token
users: The user(s) involved with the event.
"""
services = [
service
for service in self.store.get_app_services()
if service.supports_ephemeral
]
if not services or not self.notify_appservices:
return
logger.info("Checking interested services for %s" % (stream_key))
with Measure(self.clock, "notify_interested_services_ephemeral"):
for service in services:
# Only handle typing if we have the latest token
if stream_key == "typing_key" and new_token is not None:
events = await self._handle_typing(service, new_token)
if events:
self.scheduler.submit_ephemeral_events_for_as(service, events)
# We don't persist the token for typing_key for performance reasons
elif stream_key == "receipt_key":
events = await self._handle_receipts(service)
if events:
self.scheduler.submit_ephemeral_events_for_as(service, events)
await self.store.set_type_stream_id_for_appservice(
service, "read_receipt", new_token
)
elif stream_key == "presence_key":
events = await self._handle_presence(service, users)
if events:
self.scheduler.submit_ephemeral_events_for_as(service, events)
await self.store.set_type_stream_id_for_appservice(
service, "presence", new_token
)
async def _handle_typing(self, service: ApplicationService, new_token: int):
typing_source = self.event_sources.sources["typing"]
# Get the typing events from just before current
typing, _ = await typing_source.get_new_events_as(
service=service,
# For performance reasons, we don't persist the previous
# token in the DB and instead fetch the latest typing information
# for appservices.
from_key=new_token - 1,
)
return typing
async def _handle_receipts(self, service: ApplicationService):
from_key = await self.store.get_type_stream_id_for_appservice(
service, "read_receipt"
)
receipts_source = self.event_sources.sources["receipt"]
receipts, _ = await receipts_source.get_new_events_as(
service=service, from_key=from_key
)
return receipts
async def _handle_presence(
self, service: ApplicationService, users: Collection[UserID]
):
events = [] # type: List[JsonDict]
presence_source = self.event_sources.sources["presence"]
from_key = await self.store.get_type_stream_id_for_appservice(
service, "presence"
)
for user in users:
interested = await service.is_interested_in_presence(user, self.store)
if not interested:
continue
presence_events, _ = await presence_source.get_new_events(
user=user, service=service, from_key=from_key,
)
time_now = self.clock.time_msec()
presence_events = [
{
"type": "m.presence",
"sender": event.user_id,
"content": format_user_presence_state(
event, time_now, include_user_id=False
),
}
for event in presence_events
]
events = events + presence_events
async def query_user_exists(self, user_id):
"""Check if any application service knows this user_id exists.
@ -223,7 +326,7 @@ class ApplicationServicesHandler:
async def get_3pe_protocols(self, only_protocol=None):
services = self.store.get_app_services()
protocols = {}
protocols = {} # type: Dict[str, List[JsonDict]]
# Collect up all the individual protocol responses out of the ASes
for s in services:

View file

@ -13,9 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import List, Tuple
from synapse.appservice import ApplicationService
from synapse.handlers._base import BaseHandler
from synapse.types import ReadReceipt, get_domain_from_id
from synapse.types import JsonDict, ReadReceipt, get_domain_from_id
from synapse.util.async_helpers import maybe_awaitable
logger = logging.getLogger(__name__)
@ -140,5 +142,36 @@ class ReceiptEventSource:
return (events, to_key)
async def get_new_events_as(
self, from_key: int, service: ApplicationService
) -> Tuple[List[JsonDict], int]:
"""Returns a set of new receipt events that an appservice
may be interested in.
Args:
from_key: the stream position at which events should be fetched from
service: The appservice which may be interested
"""
from_key = int(from_key)
to_key = self.get_current_key()
if from_key == to_key:
return [], to_key
# We first need to fetch all new receipts
rooms_to_events = await self.store.get_linearized_receipts_for_all_rooms(
from_key=from_key, to_key=to_key
)
# Then filter down to rooms that the AS can read
events = []
for room_id, event in rooms_to_events.items():
if not await service.matches_user_in_member_list(room_id, self.store):
continue
events.append(event)
return (events, to_key)
def get_current_key(self, direction="f"):
return self.store.get_max_receipt_stream_id()

View file

@ -13,7 +13,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import itertools
import logging
from typing import TYPE_CHECKING, Any, Dict, FrozenSet, List, Optional, Set, Tuple

View file

@ -12,16 +12,16 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import random
from collections import namedtuple
from typing import TYPE_CHECKING, List, Set, Tuple
from synapse.api.errors import AuthError, ShadowBanError, SynapseError
from synapse.appservice import ApplicationService
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.tcp.streams import TypingStream
from synapse.types import UserID, get_domain_from_id
from synapse.types import JsonDict, UserID, get_domain_from_id
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.metrics import Measure
from synapse.util.wheel_timer import WheelTimer
@ -430,6 +430,33 @@ class TypingNotificationEventSource:
"content": {"user_ids": list(typing)},
}
async def get_new_events_as(
self, from_key: int, service: ApplicationService
) -> Tuple[List[JsonDict], int]:
"""Returns a set of new typing events that an appservice
may be interested in.
Args:
from_key: the stream position at which events should be fetched from
service: The appservice which may be interested
"""
with Measure(self.clock, "typing.get_new_events_as"):
from_key = int(from_key)
handler = self.get_typing_handler()
events = []
for room_id in handler._room_serials.keys():
if handler._room_serials[room_id] <= from_key:
continue
if not await service.matches_user_in_member_list(
room_id, handler.store
):
continue
events.append(self._make_event_for(room_id))
return (events, handler._latest_room_serial)
async def get_new_events(self, from_key, room_ids, **kwargs):
with Measure(self.clock, "typing.get_new_events"):
from_key = int(from_key)

View file

@ -329,6 +329,22 @@ class Notifier:
except Exception:
logger.exception("Error notifying application services of event")
async def _notify_app_services_ephemeral(
self,
stream_key: str,
new_token: Union[int, RoomStreamToken],
users: Collection[UserID] = [],
):
try:
stream_token = None
if isinstance(new_token, int):
stream_token = new_token
await self.appservice_handler.notify_interested_services_ephemeral(
stream_key, stream_token, users
)
except Exception:
logger.exception("Error notifying application services of event")
async def _notify_pusher_pool(self, max_room_stream_token: RoomStreamToken):
try:
await self._pusher_pool.on_new_notifications(max_room_stream_token)
@ -367,6 +383,15 @@ class Notifier:
self.notify_replication()
# Notify appservices
run_as_background_process(
"_notify_app_services_ephemeral",
self._notify_app_services_ephemeral,
stream_key,
new_token,
users,
)
def on_new_replication_data(self) -> None:
"""Used to inform replication listeners that something has happend
without waking up any of the normal user event streams"""

View file

@ -15,12 +15,15 @@
# limitations under the License.
import logging
import re
from typing import List
from synapse.appservice import AppServiceTransaction
from synapse.appservice import ApplicationService, AppServiceTransaction
from synapse.config.appservice import load_appservices
from synapse.events import EventBase
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import DatabasePool
from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.types import JsonDict
from synapse.util import json_encoder
logger = logging.getLogger(__name__)
@ -172,15 +175,23 @@ class ApplicationServiceTransactionWorkerStore(
"application_services_state", {"as_id": service.id}, {"state": state}
)
async def create_appservice_txn(self, service, events):
async def create_appservice_txn(
self,
service: ApplicationService,
events: List[EventBase],
ephemeral: List[JsonDict],
) -> AppServiceTransaction:
"""Atomically creates a new transaction for this application service
with the given list of events.
with the given list of events. Ephemeral events are NOT persisted to the
database and are not resent if a transaction is retried.
Args:
service(ApplicationService): The service who the transaction is for.
events(list<Event>): A list of events to put in the transaction.
service: The service who the transaction is for.
events: A list of persistent events to put in the transaction.
ephemeral: A list of ephemeral events to put in the transaction.
Returns:
AppServiceTransaction: A new transaction.
A new transaction.
"""
def _create_appservice_txn(txn):
@ -207,7 +218,9 @@ class ApplicationServiceTransactionWorkerStore(
"VALUES(?,?,?)",
(service.id, new_txn_id, event_ids),
)
return AppServiceTransaction(service=service, id=new_txn_id, events=events)
return AppServiceTransaction(
service=service, id=new_txn_id, events=events, ephemeral=ephemeral
)
return await self.db_pool.runInteraction(
"create_appservice_txn", _create_appservice_txn
@ -296,7 +309,9 @@ class ApplicationServiceTransactionWorkerStore(
events = await self.get_events_as_list(event_ids)
return AppServiceTransaction(service=service, id=entry["txn_id"], events=events)
return AppServiceTransaction(
service=service, id=entry["txn_id"], events=events, ephemeral=[]
)
def _get_last_txn(self, txn, service_id):
txn.execute(
@ -320,7 +335,7 @@ class ApplicationServiceTransactionWorkerStore(
)
async def get_new_events_for_appservice(self, current_id, limit):
"""Get all new evnets"""
"""Get all new events for an appservice"""
def get_new_events_for_appservice_txn(txn):
sql = (
@ -351,6 +366,39 @@ class ApplicationServiceTransactionWorkerStore(
return upper_bound, events
async def get_type_stream_id_for_appservice(
self, service: ApplicationService, type: str
) -> int:
def get_type_stream_id_for_appservice_txn(txn):
stream_id_type = "%s_stream_id" % type
txn.execute(
"SELECT ? FROM application_services_state WHERE as_id=?",
(stream_id_type, service.id,),
)
last_txn_id = txn.fetchone()
if last_txn_id is None or last_txn_id[0] is None: # no row exists
return 0
else:
return int(last_txn_id[0])
return await self.db_pool.runInteraction(
"get_type_stream_id_for_appservice", get_type_stream_id_for_appservice_txn
)
async def set_type_stream_id_for_appservice(
self, service: ApplicationService, type: str, pos: int
) -> None:
def set_type_stream_id_for_appservice_txn(txn):
stream_id_type = "%s_stream_id" % type
txn.execute(
"UPDATE ? SET device_list_stream_id = ? WHERE as_id=?",
(stream_id_type, pos, service.id),
)
await self.db_pool.runInteraction(
"set_type_stream_id_for_appservice", set_type_stream_id_for_appservice_txn
)
class ApplicationServiceTransactionStore(ApplicationServiceTransactionWorkerStore):
# This is currently empty due to there not being any AS storage functions

View file

@ -23,6 +23,7 @@ from twisted.internet import defer
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
from synapse.storage.database import DatabasePool
from synapse.storage.util.id_generators import StreamIdGenerator
from synapse.types import JsonDict
from synapse.util import json_encoder
from synapse.util.async_helpers import ObservableDeferred
from synapse.util.caches.descriptors import cached, cachedList
@ -274,6 +275,60 @@ class ReceiptsWorkerStore(SQLBaseStore, metaclass=abc.ABCMeta):
}
return results
@cached(num_args=2,)
async def get_linearized_receipts_for_all_rooms(
self, to_key: int, from_key: Optional[int] = None
) -> Dict[str, JsonDict]:
"""Get receipts for all rooms between two stream_ids.
Args:
to_key: Max stream id to fetch receipts upto.
from_key: Min stream id to fetch receipts from. None fetches
from the start.
Returns:
A dictionary of roomids to a list of receipts.
"""
def f(txn):
if from_key:
sql = """
SELECT * FROM receipts_linearized WHERE
stream_id > ? AND stream_id <= ?
"""
txn.execute(sql, [from_key, to_key])
else:
sql = """
SELECT * FROM receipts_linearized WHERE
stream_id <= ?
"""
txn.execute(sql, [to_key])
return self.db_pool.cursor_to_dict(txn)
txn_results = await self.db_pool.runInteraction(
"get_linearized_receipts_for_all_rooms", f
)
results = {}
for row in txn_results:
# We want a single event per room, since we want to batch the
# receipts by room, event and type.
room_event = results.setdefault(
row["room_id"],
{"type": "m.receipt", "room_id": row["room_id"], "content": {}},
)
# The content is of the form:
# {"$foo:bar": { "read": { "@user:host": <receipt> }, .. }, .. }
event_entry = room_event["content"].setdefault(row["event_id"], {})
receipt_type = event_entry.setdefault(row["receipt_type"], {})
receipt_type[row["user_id"]] = db_to_json(row["data"])
return results
async def get_users_sent_receipts_between(
self, last_id: int, current_id: int
) -> List[str]:

View file

@ -0,0 +1,18 @@
/* Copyright 2020 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
ALTER TABLE application_services_state
ADD COLUMN read_receipt_stream_id INT,
ADD COLUMN presence_stream_id INT;

View file

@ -60,7 +60,7 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase):
self.successResultOf(defer.ensureDeferred(self.txnctrl.send(service, events)))
self.store.create_appservice_txn.assert_called_once_with(
service=service, events=events # txn made and saved
service=service, events=events, ephemeral=[] # txn made and saved
)
self.assertEquals(0, len(self.txnctrl.recoverers)) # no recoverer made
txn.complete.assert_called_once_with(self.store) # txn completed
@ -81,7 +81,7 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase):
self.successResultOf(defer.ensureDeferred(self.txnctrl.send(service, events)))
self.store.create_appservice_txn.assert_called_once_with(
service=service, events=events # txn made and saved
service=service, events=events, ephemeral=[] # txn made and saved
)
self.assertEquals(0, txn.send.call_count) # txn not sent though
self.assertEquals(0, txn.complete.call_count) # or completed
@ -106,7 +106,7 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase):
self.successResultOf(defer.ensureDeferred(self.txnctrl.send(service, events)))
self.store.create_appservice_txn.assert_called_once_with(
service=service, events=events
service=service, events=events, ephemeral=[]
)
self.assertEquals(1, self.recoverer_fn.call_count) # recoverer made
self.assertEquals(1, self.recoverer.recover.call_count) # and invoked
@ -202,26 +202,28 @@ class ApplicationServiceSchedulerQueuerTestCase(unittest.TestCase):
# Expect the event to be sent immediately.
service = Mock(id=4)
event = Mock()
self.queuer.enqueue(service, event)
self.txn_ctrl.send.assert_called_once_with(service, [event])
self.queuer.enqueue_event(service, event)
self.txn_ctrl.send.assert_called_once_with(service, [event], [])
def test_send_single_event_with_queue(self):
d = defer.Deferred()
self.txn_ctrl.send = Mock(side_effect=lambda x, y: make_deferred_yieldable(d))
self.txn_ctrl.send = Mock(
side_effect=lambda x, y, z: make_deferred_yieldable(d)
)
service = Mock(id=4)
event = Mock(event_id="first")
event2 = Mock(event_id="second")
event3 = Mock(event_id="third")
# Send an event and don't resolve it just yet.
self.queuer.enqueue(service, event)
self.queuer.enqueue_event(service, event)
# Send more events: expect send() to NOT be called multiple times.
self.queuer.enqueue(service, event2)
self.queuer.enqueue(service, event3)
self.txn_ctrl.send.assert_called_with(service, [event])
self.queuer.enqueue_event(service, event2)
self.queuer.enqueue_event(service, event3)
self.txn_ctrl.send.assert_called_with(service, [event], [])
self.assertEquals(1, self.txn_ctrl.send.call_count)
# Resolve the send event: expect the queued events to be sent
d.callback(service)
self.txn_ctrl.send.assert_called_with(service, [event2, event3])
self.txn_ctrl.send.assert_called_with(service, [event2, event3], [])
self.assertEquals(2, self.txn_ctrl.send.call_count)
def test_multiple_service_queues(self):
@ -239,21 +241,58 @@ class ApplicationServiceSchedulerQueuerTestCase(unittest.TestCase):
send_return_list = [srv_1_defer, srv_2_defer]
def do_send(x, y):
def do_send(x, y, z):
return make_deferred_yieldable(send_return_list.pop(0))
self.txn_ctrl.send = Mock(side_effect=do_send)
# send events for different ASes and make sure they are sent
self.queuer.enqueue(srv1, srv_1_event)
self.queuer.enqueue(srv1, srv_1_event2)
self.txn_ctrl.send.assert_called_with(srv1, [srv_1_event])
self.queuer.enqueue(srv2, srv_2_event)
self.queuer.enqueue(srv2, srv_2_event2)
self.txn_ctrl.send.assert_called_with(srv2, [srv_2_event])
self.queuer.enqueue_event(srv1, srv_1_event)
self.queuer.enqueue_event(srv1, srv_1_event2)
self.txn_ctrl.send.assert_called_with(srv1, [srv_1_event], [])
self.queuer.enqueue_event(srv2, srv_2_event)
self.queuer.enqueue_event(srv2, srv_2_event2)
self.txn_ctrl.send.assert_called_with(srv2, [srv_2_event], [])
# make sure callbacks for a service only send queued events for THAT
# service
srv_2_defer.callback(srv2)
self.txn_ctrl.send.assert_called_with(srv2, [srv_2_event2])
self.txn_ctrl.send.assert_called_with(srv2, [srv_2_event2], [])
self.assertEquals(3, self.txn_ctrl.send.call_count)
def test_send_single_ephemeral_no_queue(self):
# Expect the event to be sent immediately.
service = Mock(id=4, name="service")
event_list = [Mock(name="event")]
self.queuer.enqueue_ephemeral(service, event_list)
self.txn_ctrl.send.assert_called_once_with(service, [], event_list)
def test_send_multiple_ephemeral_no_queue(self):
# Expect the event to be sent immediately.
service = Mock(id=4, name="service")
event_list = [Mock(name="event1"), Mock(name="event2"), Mock(name="event3")]
self.queuer.enqueue_ephemeral(service, event_list)
self.txn_ctrl.send.assert_called_once_with(service, [], event_list)
def test_send_single_ephemeral_with_queue(self):
d = defer.Deferred()
self.txn_ctrl.send = Mock(
side_effect=lambda x, y, z: make_deferred_yieldable(d)
)
service = Mock(id=4)
event_list_1 = [Mock(event_id="event1"), Mock(event_id="event2")]
event_list_2 = [Mock(event_id="event3"), Mock(event_id="event4")]
event_list_3 = [Mock(event_id="event5"), Mock(event_id="event6")]
# Send an event and don't resolve it just yet.
self.queuer.enqueue_ephemeral(service, event_list_1)
# Send more events: expect send() to NOT be called multiple times.
self.queuer.enqueue_ephemeral(service, event_list_2)
self.queuer.enqueue_ephemeral(service, event_list_3)
self.txn_ctrl.send.assert_called_with(service, [], event_list_1)
self.assertEquals(1, self.txn_ctrl.send.call_count)
# Resolve txn_ctrl.send
d.callback(service)
# Expect the queued events to be sent
self.txn_ctrl.send.assert_called_with(service, [], event_list_2 + event_list_3)
self.assertEquals(2, self.txn_ctrl.send.call_count)

View file

@ -244,7 +244,7 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase):
service = Mock(id=self.as_list[0]["id"])
events = [Mock(event_id="e1"), Mock(event_id="e2")]
txn = yield defer.ensureDeferred(
self.store.create_appservice_txn(service, events)
self.store.create_appservice_txn(service, events, [])
)
self.assertEquals(txn.id, 1)
self.assertEquals(txn.events, events)
@ -258,7 +258,7 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase):
yield self._insert_txn(service.id, 9644, events)
yield self._insert_txn(service.id, 9645, events)
txn = yield defer.ensureDeferred(
self.store.create_appservice_txn(service, events)
self.store.create_appservice_txn(service, events, [])
)
self.assertEquals(txn.id, 9646)
self.assertEquals(txn.events, events)
@ -270,7 +270,7 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase):
events = [Mock(event_id="e1"), Mock(event_id="e2")]
yield self._set_last_txn(service.id, 9643)
txn = yield defer.ensureDeferred(
self.store.create_appservice_txn(service, events)
self.store.create_appservice_txn(service, events, [])
)
self.assertEquals(txn.id, 9644)
self.assertEquals(txn.events, events)
@ -293,7 +293,7 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase):
yield self._insert_txn(self.as_list[3]["id"], 9643, events)
txn = yield defer.ensureDeferred(
self.store.create_appservice_txn(service, events)
self.store.create_appservice_txn(service, events, [])
)
self.assertEquals(txn.id, 9644)
self.assertEquals(txn.events, events)