# # This file is licensed under the Affero General Public License (AGPL) version 3. # # Copyright (C) 2023 New Vector, Ltd # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # See the GNU Affero General Public License for more details: # . # # Originally licensed under the Apache License, Version 2.0: # . # # [This file includes modifications made by New Vector Limited] # # from typing import Any, Dict, List, Tuple from unittest.mock import Mock from parameterized import parameterized from twisted.internet.defer import Deferred from twisted.test.proto_helpers import MemoryReactor import synapse.rest.admin from synapse.logging.context import make_deferred_yieldable from synapse.push import PusherConfig, PusherConfigException from synapse.rest.admin.experimental_features import ExperimentalFeature from synapse.rest.client import login, push_rule, pusher, receipts, room, versions from synapse.server import HomeServer from synapse.types import JsonDict from synapse.util import Clock from tests.unittest import HomeserverTestCase, override_config class HTTPPusherTests(HomeserverTestCase): servlets = [ synapse.rest.admin.register_servlets_for_client_rest_resource, room.register_servlets, login.register_servlets, receipts.register_servlets, push_rule.register_servlets, pusher.register_servlets, versions.register_servlets, ] user_id = True hijack_auth = False def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer: self.push_attempts: List[Tuple[Deferred, str, dict]] = [] m = Mock() def post_json_get_json(url: str, body: JsonDict) -> Deferred: d: Deferred = Deferred() self.push_attempts.append((d, url, body)) return make_deferred_yieldable(d) m.post_json_get_json = post_json_get_json hs = self.setup_test_homeserver(proxied_blocklisted_http_client=m) return hs def test_invalid_configuration(self) -> None: """Invalid push configurations should be rejected.""" # Register the user who gets notified user_id = self.register_user("user", "pass") access_token = self.login("user", "pass") # Register the pusher user_tuple = self.get_success( self.hs.get_datastores().main.get_user_by_access_token(access_token) ) assert user_tuple is not None device_id = user_tuple.device_id def test_data(data: Any) -> None: self.get_failure( self.hs.get_pusherpool().add_or_update_pusher( user_id=user_id, device_id=device_id, kind="http", app_id="m.http", app_display_name="HTTP Push Notifications", device_display_name="pushy push", pushkey="a@example.com", lang=None, data=data, ), PusherConfigException, ) # Data must be provided with a URL. test_data(None) test_data({}) test_data({"url": 1}) # A bare domain name isn't accepted. test_data({"url": "example.com"}) # A URL without a path isn't accepted. test_data({"url": "http://example.com"}) # A url with an incorrect path isn't accepted. test_data({"url": "http://example.com/foo"}) def test_sends_http(self) -> None: """ The HTTP pusher will send pushes for each message to a HTTP endpoint when configured to do so. """ # Register the user who gets notified user_id = self.register_user("user", "pass") access_token = self.login("user", "pass") # Register the user who sends the message other_user_id = self.register_user("otheruser", "pass") other_access_token = self.login("otheruser", "pass") # Register the pusher user_tuple = self.get_success( self.hs.get_datastores().main.get_user_by_access_token(access_token) ) assert user_tuple is not None device_id = user_tuple.device_id self.get_success( self.hs.get_pusherpool().add_or_update_pusher( user_id=user_id, device_id=device_id, kind="http", app_id="m.http", app_display_name="HTTP Push Notifications", device_display_name="pushy push", pushkey="a@example.com", lang=None, data={"url": "http://example.com/_matrix/push/v1/notify"}, ) ) # Create a room room = self.helper.create_room_as(user_id, tok=access_token) # The other user joins self.helper.join(room=room, user=other_user_id, tok=other_access_token) # The other user sends some messages self.helper.send(room, body="Hi!", tok=other_access_token) self.helper.send(room, body="There!", tok=other_access_token) # Get the stream ordering before it gets sent pushers = list( self.get_success( self.hs.get_datastores().main.get_pushers_by({"user_name": user_id}) ) ) self.assertEqual(len(pushers), 1) last_stream_ordering = pushers[0].last_stream_ordering # Advance time a bit, so the pusher will register something has happened self.pump() # It hasn't succeeded yet, so the stream ordering shouldn't have moved pushers = list( self.get_success( self.hs.get_datastores().main.get_pushers_by({"user_name": user_id}) ) ) self.assertEqual(len(pushers), 1) self.assertEqual(last_stream_ordering, pushers[0].last_stream_ordering) # One push was attempted to be sent -- it'll be the first message self.assertEqual(len(self.push_attempts), 1) self.assertEqual( self.push_attempts[0][1], "http://example.com/_matrix/push/v1/notify" ) self.assertEqual( self.push_attempts[0][2]["notification"]["content"]["body"], "Hi!" ) # Make the push succeed self.push_attempts[0][0].callback({}) self.pump() # The stream ordering has increased pushers = list( self.get_success( self.hs.get_datastores().main.get_pushers_by({"user_name": user_id}) ) ) self.assertEqual(len(pushers), 1) self.assertTrue(pushers[0].last_stream_ordering > last_stream_ordering) last_stream_ordering = pushers[0].last_stream_ordering # Now it'll try and send the second push message, which will be the second one self.assertEqual(len(self.push_attempts), 2) self.assertEqual( self.push_attempts[1][1], "http://example.com/_matrix/push/v1/notify" ) self.assertEqual( self.push_attempts[1][2]["notification"]["content"]["body"], "There!" ) # Make the second push succeed self.push_attempts[1][0].callback({}) self.pump() # The stream ordering has increased, again pushers = list( self.get_success( self.hs.get_datastores().main.get_pushers_by({"user_name": user_id}) ) ) self.assertEqual(len(pushers), 1) self.assertTrue(pushers[0].last_stream_ordering > last_stream_ordering) def test_sends_high_priority_for_encrypted(self) -> None: """ The HTTP pusher will send pushes at high priority if they correspond to an encrypted message. This will happen both in 1:1 rooms and larger rooms. """ # Register the user who gets notified user_id = self.register_user("user", "pass") access_token = self.login("user", "pass") # Register the user who sends the message other_user_id = self.register_user("otheruser", "pass") other_access_token = self.login("otheruser", "pass") # Register a third user yet_another_user_id = self.register_user("yetanotheruser", "pass") yet_another_access_token = self.login("yetanotheruser", "pass") # Create a room room = self.helper.create_room_as(user_id, tok=access_token) # The other user joins self.helper.join(room=room, user=other_user_id, tok=other_access_token) # Register the pusher user_tuple = self.get_success( self.hs.get_datastores().main.get_user_by_access_token(access_token) ) assert user_tuple is not None device_id = user_tuple.device_id self.get_success( self.hs.get_pusherpool().add_or_update_pusher( user_id=user_id, device_id=device_id, kind="http", app_id="m.http", app_display_name="HTTP Push Notifications", device_display_name="pushy push", pushkey="a@example.com", lang=None, data={"url": "http://example.com/_matrix/push/v1/notify"}, ) ) # Send an encrypted event # I know there'd normally be set-up of an encrypted room first # but this will do for our purposes self.helper.send_event( room, "m.room.encrypted", content={ "algorithm": "m.megolm.v1.aes-sha2", "sender_key": "6lImKbzK51MzWLwHh8tUM3UBBSBrLlgup/OOCGTvumM", "ciphertext": "AwgAErABoRxwpMipdgiwXgu46rHiWQ0DmRj0qUlPrMraBUDk" "leTnJRljpuc7IOhsYbLY3uo2WI0ab/ob41sV+3JEIhODJPqH" "TK7cEZaIL+/up9e+dT9VGF5kRTWinzjkeqO8FU5kfdRjm+3w" "0sy3o1OCpXXCfO+faPhbV/0HuK4ndx1G+myNfK1Nk/CxfMcT" "BT+zDS/Df/QePAHVbrr9uuGB7fW8ogW/ulnydgZPRluusFGv" "J3+cg9LoPpZPAmv5Me3ec7NtdlfN0oDZ0gk3TiNkkhsxDG9Y" "YcNzl78USI0q8+kOV26Bu5dOBpU4WOuojXZHJlP5lMgdzLLl" "EQ0", "session_id": "IigqfNWLL+ez/Is+Duwp2s4HuCZhFG9b9CZKTYHtQ4A", "device_id": "AHQDUSTAAA", }, tok=other_access_token, ) # Advance time a bit, so the pusher will register something has happened self.pump() # Make the push succeed self.push_attempts[0][0].callback({}) self.pump() # Check our push made it with high priority self.assertEqual(len(self.push_attempts), 1) self.assertEqual( self.push_attempts[0][1], "http://example.com/_matrix/push/v1/notify" ) self.assertEqual(self.push_attempts[0][2]["notification"]["prio"], "high") # Add yet another person — we want to make this room not a 1:1 # (as encrypted messages in a 1:1 currently have tweaks applied # so it doesn't properly exercise the condition of all encrypted # messages need to be high). self.helper.join( room=room, user=yet_another_user_id, tok=yet_another_access_token ) # Check no push notifications are sent regarding the membership changes # (that would confuse the test) self.pump() self.assertEqual(len(self.push_attempts), 1) # Send another encrypted event self.helper.send_event( room, "m.room.encrypted", content={ "ciphertext": "AwgAEoABtEuic/2DF6oIpNH+q/PonzlhXOVho8dTv0tzFr5m" "9vTo50yabx3nxsRlP2WxSqa8I07YftP+EKWCWJvTkg6o7zXq" "6CK+GVvLQOVgK50SfvjHqJXN+z1VEqj+5mkZVN/cAgJzoxcH" "zFHkwDPJC8kQs47IHd8EO9KBUK4v6+NQ1uE/BIak4qAf9aS/" "kI+f0gjn9IY9K6LXlah82A/iRyrIrxkCkE/n0VfvLhaWFecC" "sAWTcMLoF6fh1Jpke95mljbmFSpsSd/eEQw", "device_id": "SRCFTWTHXO", "session_id": "eMA+bhGczuTz1C5cJR1YbmrnnC6Goni4lbvS5vJ1nG4", "algorithm": "m.megolm.v1.aes-sha2", "sender_key": "rC/XSIAiYrVGSuaHMop8/pTZbku4sQKBZwRwukgnN1c", }, tok=other_access_token, ) # Advance time a bit, so the pusher will register something has happened self.pump() self.assertEqual(len(self.push_attempts), 2) self.assertEqual( self.push_attempts[1][1], "http://example.com/_matrix/push/v1/notify" ) self.assertEqual(self.push_attempts[1][2]["notification"]["prio"], "high") def test_sends_high_priority_for_one_to_one_only(self) -> None: """ The HTTP pusher will send pushes at high priority if they correspond to a message in a one-to-one room. """ # Register the user who gets notified user_id = self.register_user("user", "pass") access_token = self.login("user", "pass") # Register the user who sends the message other_user_id = self.register_user("otheruser", "pass") other_access_token = self.login("otheruser", "pass") # Register a third user yet_another_user_id = self.register_user("yetanotheruser", "pass") yet_another_access_token = self.login("yetanotheruser", "pass") # Create a room room = self.helper.create_room_as(user_id, tok=access_token) # The other user joins self.helper.join(room=room, user=other_user_id, tok=other_access_token) # Register the pusher user_tuple = self.get_success( self.hs.get_datastores().main.get_user_by_access_token(access_token) ) assert user_tuple is not None device_id = user_tuple.device_id self.get_success( self.hs.get_pusherpool().add_or_update_pusher( user_id=user_id, device_id=device_id, kind="http", app_id="m.http", app_display_name="HTTP Push Notifications", device_display_name="pushy push", pushkey="a@example.com", lang=None, data={"url": "http://example.com/_matrix/push/v1/notify"}, ) ) # Send a message self.helper.send(room, body="Hi!", tok=other_access_token) # Advance time a bit, so the pusher will register something has happened self.pump() # Make the push succeed self.push_attempts[0][0].callback({}) self.pump() # Check our push made it with high priority — this is a one-to-one room self.assertEqual(len(self.push_attempts), 1) self.assertEqual( self.push_attempts[0][1], "http://example.com/_matrix/push/v1/notify" ) self.assertEqual(self.push_attempts[0][2]["notification"]["prio"], "high") # Yet another user joins self.helper.join( room=room, user=yet_another_user_id, tok=yet_another_access_token ) # Check no push notifications are sent regarding the membership changes # (that would confuse the test) self.pump() self.assertEqual(len(self.push_attempts), 1) # Send another event self.helper.send(room, body="Welcome!", tok=other_access_token) # Advance time a bit, so the pusher will register something has happened self.pump() self.assertEqual(len(self.push_attempts), 2) self.assertEqual( self.push_attempts[1][1], "http://example.com/_matrix/push/v1/notify" ) # check that this is low-priority self.assertEqual(self.push_attempts[1][2]["notification"]["prio"], "low") def test_sends_high_priority_for_mention(self) -> None: """ The HTTP pusher will send pushes at high priority if they correspond to a message containing the user's display name. """ # Register the user who gets notified user_id = self.register_user("user", "pass") access_token = self.login("user", "pass") # Register the user who sends the message other_user_id = self.register_user("otheruser", "pass") other_access_token = self.login("otheruser", "pass") # Register a third user yet_another_user_id = self.register_user("yetanotheruser", "pass") yet_another_access_token = self.login("yetanotheruser", "pass") # Create a room room = self.helper.create_room_as(user_id, tok=access_token) # The other users join self.helper.join(room=room, user=other_user_id, tok=other_access_token) self.helper.join( room=room, user=yet_another_user_id, tok=yet_another_access_token ) # Register the pusher user_tuple = self.get_success( self.hs.get_datastores().main.get_user_by_access_token(access_token) ) assert user_tuple is not None device_id = user_tuple.device_id self.get_success( self.hs.get_pusherpool().add_or_update_pusher( user_id=user_id, device_id=device_id, kind="http", app_id="m.http", app_display_name="HTTP Push Notifications", device_display_name="pushy push", pushkey="a@example.com", lang=None, data={"url": "http://example.com/_matrix/push/v1/notify"}, ) ) # Send a message self.helper.send(room, body="Oh, user, hello!", tok=other_access_token) # Advance time a bit, so the pusher will register something has happened self.pump() # Make the push succeed self.push_attempts[0][0].callback({}) self.pump() # Check our push made it with high priority self.assertEqual(len(self.push_attempts), 1) self.assertEqual( self.push_attempts[0][1], "http://example.com/_matrix/push/v1/notify" ) self.assertEqual(self.push_attempts[0][2]["notification"]["prio"], "high") # Send another event, this time with no mention self.helper.send(room, body="Are you there?", tok=other_access_token) # Advance time a bit, so the pusher will register something has happened self.pump() self.assertEqual(len(self.push_attempts), 2) self.assertEqual( self.push_attempts[1][1], "http://example.com/_matrix/push/v1/notify" ) # check that this is low-priority self.assertEqual(self.push_attempts[1][2]["notification"]["prio"], "low") def test_sends_high_priority_for_atroom(self) -> None: """ The HTTP pusher will send pushes at high priority if they correspond to a message that contains @room. """ # Register the user who gets notified user_id = self.register_user("user", "pass") access_token = self.login("user", "pass") # Register the user who sends the message other_user_id = self.register_user("otheruser", "pass") other_access_token = self.login("otheruser", "pass") # Register a third user yet_another_user_id = self.register_user("yetanotheruser", "pass") yet_another_access_token = self.login("yetanotheruser", "pass") # Create a room (as other_user so the power levels are compatible with # other_user sending @room). room = self.helper.create_room_as(other_user_id, tok=other_access_token) # The other users join self.helper.join(room=room, user=user_id, tok=access_token) self.helper.join( room=room, user=yet_another_user_id, tok=yet_another_access_token ) # Register the pusher user_tuple = self.get_success( self.hs.get_datastores().main.get_user_by_access_token(access_token) ) assert user_tuple is not None device_id = user_tuple.device_id self.get_success( self.hs.get_pusherpool().add_or_update_pusher( user_id=user_id, device_id=device_id, kind="http", app_id="m.http", app_display_name="HTTP Push Notifications", device_display_name="pushy push", pushkey="a@example.com", lang=None, data={"url": "http://example.com/_matrix/push/v1/notify"}, ) ) # Send a message self.helper.send( room, body="@room eeek! There's a spider on the table!", tok=other_access_token, ) # Advance time a bit, so the pusher will register something has happened self.pump() # Make the push succeed self.push_attempts[0][0].callback({}) self.pump() # Check our push made it with high priority self.assertEqual(len(self.push_attempts), 1) self.assertEqual( self.push_attempts[0][1], "http://example.com/_matrix/push/v1/notify" ) self.assertEqual(self.push_attempts[0][2]["notification"]["prio"], "high") # Send another event, this time as someone without the power of @room self.helper.send( room, body="@room the spider is gone", tok=yet_another_access_token ) # Advance time a bit, so the pusher will register something has happened self.pump() self.assertEqual(len(self.push_attempts), 2) self.assertEqual( self.push_attempts[1][1], "http://example.com/_matrix/push/v1/notify" ) # check that this is low-priority self.assertEqual(self.push_attempts[1][2]["notification"]["prio"], "low") def test_push_unread_count_group_by_room(self) -> None: """ The HTTP pusher will group unread count by number of unread rooms. """ # Carry out common push count tests and setup self._test_push_unread_count() # Carry out our option-value specific test # # This push should still only contain an unread count of 1 (for 1 unread room) self._check_push_attempt(7, 1) @override_config({"push": {"group_unread_count_by_room": False}}) def test_push_unread_count_message_count(self) -> None: """ The HTTP pusher will send the total unread message count. """ # Carry out common push count tests and setup self._test_push_unread_count() # Carry out our option-value specific test # # We're counting every unread message, so there should now be 3 since the # last read receipt self._check_push_attempt(7, 3) def _test_push_unread_count(self) -> None: """ Tests that the correct unread count appears in sent push notifications Note that: * Sending messages will cause push notifications to go out to relevant users * Sending a read receipt will cause the HTTP pusher to check whether the unread count has changed since the last push notification. If so, a "badge update" notification goes out to the user that sent the receipt """ # Register the user who gets notified user_id = self.register_user("user", "pass") access_token = self.login("user", "pass") # Register the user who sends the message other_user_id = self.register_user("other_user", "pass") other_access_token = self.login("other_user", "pass") # Create a room (as other_user) room_id = self.helper.create_room_as(other_user_id, tok=other_access_token) # The user to get notified joins self.helper.join(room=room_id, user=user_id, tok=access_token) # Register the pusher user_tuple = self.get_success( self.hs.get_datastores().main.get_user_by_access_token(access_token) ) assert user_tuple is not None device_id = user_tuple.device_id self.get_success( self.hs.get_pusherpool().add_or_update_pusher( user_id=user_id, device_id=device_id, kind="http", app_id="m.http", app_display_name="HTTP Push Notifications", device_display_name="pushy push", pushkey="a@example.com", lang=None, data={"url": "http://example.com/_matrix/push/v1/notify"}, ) ) # Send a message response = self.helper.send( room_id, body="Hello there!", tok=other_access_token ) first_message_event_id = response["event_id"] expected_push_attempts = 1 self._check_push_attempt(expected_push_attempts, 1) self._send_read_request(access_token, first_message_event_id, room_id) # Unread count has changed. Therefore, ensure that read request triggers # a push notification. expected_push_attempts += 1 self.assertEqual(len(self.push_attempts), expected_push_attempts) # Send another message response2 = self.helper.send( room_id, body="How's the weather today?", tok=other_access_token ) second_message_event_id = response2["event_id"] expected_push_attempts += 1 self._check_push_attempt(expected_push_attempts, 1) self._send_read_request(access_token, second_message_event_id, room_id) expected_push_attempts += 1 self._check_push_attempt(expected_push_attempts, 0) # If we're grouping by room, sending more messages shouldn't increase the # unread count, as they're all being sent in the same room. Otherwise, it # should. Therefore, the last call to _check_push_attempt is done in the # caller method. self.helper.send(room_id, body="Hello?", tok=other_access_token) expected_push_attempts += 1 self._advance_time_and_make_push_succeed(expected_push_attempts) self.helper.send(room_id, body="Hello??", tok=other_access_token) expected_push_attempts += 1 self._advance_time_and_make_push_succeed(expected_push_attempts) self.helper.send(room_id, body="HELLO???", tok=other_access_token) def _advance_time_and_make_push_succeed(self, expected_push_attempts: int) -> None: self.pump() self.push_attempts[expected_push_attempts - 1][0].callback({}) def _check_push_attempt( self, expected_push_attempts: int, expected_unread_count_last_push: int ) -> None: """ Makes sure that the last expected push attempt succeeds and checks whether it contains the expected unread count. """ self._advance_time_and_make_push_succeed(expected_push_attempts) # Check our push made it self.assertEqual(len(self.push_attempts), expected_push_attempts) _, push_url, push_body = self.push_attempts[expected_push_attempts - 1] self.assertEqual( push_url, "http://example.com/_matrix/push/v1/notify", ) # Check that the unread count for the room is 0 # # The unread count is zero as the user has no read receipt in the room yet self.assertEqual( push_body["notification"]["counts"]["unread"], expected_unread_count_last_push, ) def _send_read_request( self, access_token: str, message_event_id: str, room_id: str ) -> None: # Now set the user's read receipt position to the first event # # This will actually trigger a new notification to be sent out so that # even if the user does not receive another message, their unread # count goes down channel = self.make_request( "POST", "/rooms/%s/receipt/m.read/%s" % (room_id, message_event_id), {}, access_token=access_token, ) self.assertEqual(channel.code, 200, channel.json_body) def _make_user_with_pusher( self, username: str, enabled: bool = True ) -> Tuple[str, str]: """Registers a user and creates a pusher for them. Args: username: the localpart of the new user's Matrix ID. enabled: whether to create the pusher in an enabled or disabled state. """ user_id = self.register_user(username, "pass") access_token = self.login(username, "pass") # Register the pusher self._set_pusher(user_id, access_token, enabled) return user_id, access_token def _set_pusher(self, user_id: str, access_token: str, enabled: bool) -> None: """Creates or updates the pusher for the given user. Args: user_id: the user's Matrix ID. access_token: the access token associated with the pusher. enabled: whether to enable or disable the pusher. """ user_tuple = self.get_success( self.hs.get_datastores().main.get_user_by_access_token(access_token) ) assert user_tuple is not None device_id = user_tuple.device_id self.get_success( self.hs.get_pusherpool().add_or_update_pusher( user_id=user_id, device_id=device_id, kind="http", app_id="m.http", app_display_name="HTTP Push Notifications", device_display_name="pushy push", pushkey="a@example.com", lang=None, data={"url": "http://example.com/_matrix/push/v1/notify"}, enabled=enabled, ) ) def test_dont_notify_rule_overrides_message(self) -> None: """ The override push rule will suppress notification """ user_id, access_token = self._make_user_with_pusher("user") other_user_id, other_access_token = self._make_user_with_pusher("otheruser") # Create a room room = self.helper.create_room_as(user_id, tok=access_token) # Disable user notifications for this room -> user body = { "conditions": [{"kind": "event_match", "key": "room_id", "pattern": room}], "actions": ["dont_notify"], } channel = self.make_request( "PUT", "/pushrules/global/override/best.friend", body, access_token=access_token, ) self.assertEqual(channel.code, 200) # Check we start with no pushes self.assertEqual(len(self.push_attempts), 0) # The other user joins self.helper.join(room=room, user=other_user_id, tok=other_access_token) # The other user sends a message (ignored by dont_notify push rule set above) self.helper.send(room, body="Hi!", tok=other_access_token) self.assertEqual(len(self.push_attempts), 0) # The user sends a message back (sends a notification) self.helper.send(room, body="Hello", tok=access_token) self.assertEqual(len(self.push_attempts), 1) @override_config({"experimental_features": {"msc3881_enabled": True}}) def test_disable(self) -> None: """Tests that disabling a pusher means it's not pushed to anymore.""" user_id, access_token = self._make_user_with_pusher("user") other_user_id, other_access_token = self._make_user_with_pusher("otheruser") room = self.helper.create_room_as(user_id, tok=access_token) self.helper.join(room=room, user=other_user_id, tok=other_access_token) # Send a message and check that it generated a push. self.helper.send(room, body="Hi!", tok=other_access_token) self.assertEqual(len(self.push_attempts), 1) # Disable the pusher. self._set_pusher(user_id, access_token, enabled=False) # Send another message and check that it did not generate a push. self.helper.send(room, body="Hi!", tok=other_access_token) self.assertEqual(len(self.push_attempts), 1) # Get the pushers for the user and check that it is marked as disabled. channel = self.make_request("GET", "/pushers", access_token=access_token) self.assertEqual(channel.code, 200) self.assertEqual(len(channel.json_body["pushers"]), 1) enabled = channel.json_body["pushers"][0]["org.matrix.msc3881.enabled"] self.assertFalse(enabled) self.assertTrue(isinstance(enabled, bool)) @override_config({"experimental_features": {"msc3881_enabled": True}}) def test_enable(self) -> None: """Tests that enabling a disabled pusher means it gets pushed to.""" # Create the user with the pusher already disabled. user_id, access_token = self._make_user_with_pusher("user", enabled=False) other_user_id, other_access_token = self._make_user_with_pusher("otheruser") room = self.helper.create_room_as(user_id, tok=access_token) self.helper.join(room=room, user=other_user_id, tok=other_access_token) # Send a message and check that it did not generate a push. self.helper.send(room, body="Hi!", tok=other_access_token) self.assertEqual(len(self.push_attempts), 0) # Enable the pusher. self._set_pusher(user_id, access_token, enabled=True) # Send another message and check that it did generate a push. self.helper.send(room, body="Hi!", tok=other_access_token) self.assertEqual(len(self.push_attempts), 1) # Get the pushers for the user and check that it is marked as enabled. channel = self.make_request("GET", "/pushers", access_token=access_token) self.assertEqual(channel.code, 200) self.assertEqual(len(channel.json_body["pushers"]), 1) enabled = channel.json_body["pushers"][0]["org.matrix.msc3881.enabled"] self.assertTrue(enabled) self.assertTrue(isinstance(enabled, bool)) @override_config({"experimental_features": {"msc3881_enabled": True}}) def test_null_enabled(self) -> None: """Tests that a pusher that has an 'enabled' column set to NULL (eg pushers created before the column was introduced) is considered enabled. """ # We intentionally set 'enabled' to None so that it's stored as NULL in the # database. user_id, access_token = self._make_user_with_pusher("user", enabled=None) # type: ignore[arg-type] channel = self.make_request("GET", "/pushers", access_token=access_token) self.assertEqual(channel.code, 200) self.assertEqual(len(channel.json_body["pushers"]), 1) self.assertTrue(channel.json_body["pushers"][0]["org.matrix.msc3881.enabled"]) def test_update_different_device_access_token_device_id(self) -> None: """Tests that if we create a pusher from one device, the update it from another device, the device ID associated with the pusher stays the same. """ # Create a user with a pusher. user_id, access_token = self._make_user_with_pusher("user") # Get the device ID for the current access token, since that's what we store in # the pushers table. user_tuple = self.get_success( self.hs.get_datastores().main.get_user_by_access_token(access_token) ) assert user_tuple is not None device_id = user_tuple.device_id # Generate a new access token, and update the pusher with it. new_token = self.login("user", "pass") self._set_pusher(user_id, new_token, enabled=False) # Get the current list of pushers for the user. ret = self.get_success( self.hs.get_datastores().main.get_pushers_by({"user_name": user_id}) ) pushers: List[PusherConfig] = list(ret) # Check that we still have one pusher, and that the device ID associated with # it didn't change. self.assertEqual(len(pushers), 1) self.assertEqual(pushers[0].device_id, device_id) @override_config({"experimental_features": {"msc3881_enabled": True}}) def test_device_id(self) -> None: """Tests that a pusher created with a given device ID shows that device ID in GET /pushers requests. """ self.register_user("user", "pass") access_token = self.login("user", "pass") # We create the pusher with an HTTP request rather than with # _make_user_with_pusher so that we can test the device ID is correctly set when # creating a pusher via an API call. self.make_request( method="POST", path="/pushers/set", content={ "kind": "http", "app_id": "m.http", "app_display_name": "HTTP Push Notifications", "device_display_name": "pushy push", "pushkey": "a@example.com", "lang": "en", "data": {"url": "http://example.com/_matrix/push/v1/notify"}, }, access_token=access_token, ) # Look up the user info for the access token so we can compare the device ID. lookup_result = self.get_success( self.hs.get_datastores().main.get_user_by_access_token(access_token) ) assert lookup_result is not None # Get the user's devices and check it has the correct device ID. channel = self.make_request("GET", "/pushers", access_token=access_token) self.assertEqual(channel.code, 200) self.assertEqual(len(channel.json_body["pushers"]), 1) self.assertEqual( channel.json_body["pushers"][0]["org.matrix.msc3881.device_id"], lookup_result.device_id, ) def test_device_id_feature_flag(self) -> None: """Tests that a pusher created with a given device ID shows that device ID in GET /pushers requests when feature is enabled for the user """ user_id = self.register_user("user", "pass") access_token = self.login("user", "pass") # We create the pusher with an HTTP request rather than with # _make_user_with_pusher so that we can test the device ID is correctly set when # creating a pusher via an API call. self.make_request( method="POST", path="/pushers/set", content={ "kind": "http", "app_id": "m.http", "app_display_name": "HTTP Push Notifications", "device_display_name": "pushy push", "pushkey": "a@example.com", "lang": "en", "data": {"url": "http://example.com/_matrix/push/v1/notify"}, }, access_token=access_token, ) # Look up the user info for the access token so we can compare the device ID. store = self.hs.get_datastores().main lookup_result = self.get_success(store.get_user_by_access_token(access_token)) assert lookup_result is not None # Check field is not there before we enable the feature flag channel = self.make_request("GET", "/pushers", access_token=access_token) self.assertEqual(channel.code, 200) self.assertEqual(len(channel.json_body["pushers"]), 1) self.assertNotIn( "org.matrix.msc3881.device_id", channel.json_body["pushers"][0] ) self.get_success( store.set_features_for_user(user_id, {ExperimentalFeature.MSC3881: True}) ) # Get the user's devices and check it has the correct device ID. channel = self.make_request("GET", "/pushers", access_token=access_token) self.assertEqual(channel.code, 200) self.assertEqual(len(channel.json_body["pushers"]), 1) self.assertEqual( channel.json_body["pushers"][0]["org.matrix.msc3881.device_id"], lookup_result.device_id, ) def test_msc3881_client_versions_flag(self) -> None: """Tests that MSC3881 only appears in /versions if user has it enabled.""" user_id = self.register_user("user", "pass") access_token = self.login("user", "pass") # Check feature is disabled in /versions channel = self.make_request( "GET", "/_matrix/client/versions", access_token=access_token ) self.assertEqual(channel.code, 200) self.assertFalse(channel.json_body["unstable_features"]["org.matrix.msc3881"]) # Enable feature for user self.get_success( self.hs.get_datastores().main.set_features_for_user( user_id, {ExperimentalFeature.MSC3881: True} ) ) # Check feature is now enabled in /versions for user channel = self.make_request( "GET", "/_matrix/client/versions", access_token=access_token ) self.assertEqual(channel.code, 200) self.assertTrue(channel.json_body["unstable_features"]["org.matrix.msc3881"]) @override_config({"push": {"jitter_delay": "10s"}}) def test_jitter(self) -> None: """Tests that enabling jitter actually delays sending push.""" user_id, access_token = self._make_user_with_pusher("user") other_user_id, other_access_token = self._make_user_with_pusher("otheruser") room = self.helper.create_room_as(user_id, tok=access_token) self.helper.join(room=room, user=other_user_id, tok=other_access_token) # Send a message and check that it did not generate a push, as it should # be delayed. self.helper.send(room, body="Hi!", tok=other_access_token) self.assertEqual(len(self.push_attempts), 0) # Now advance time past the max jitter, and assert the message was sent. self.reactor.advance(15) self.assertEqual(len(self.push_attempts), 1) self.push_attempts[0][0].callback({}) # Now we send a bunch of messages and assert that they were all sent # within the 10s max delay. for _ in range(10): self.helper.send(room, body="Hi!", tok=other_access_token) index = 1 for _ in range(11): while len(self.push_attempts) > index: self.push_attempts[index][0].callback({}) self.pump() index += 1 self.reactor.advance(1) self.pump() self.assertEqual(len(self.push_attempts), 11) @parameterized.expand( [ # Badge count disabled (True, True), (True, False), # Badge count enabled (False, True), (False, False), ] ) @override_config({"experimental_features": {"msc4076_enabled": True}}) def test_msc4076_badge_count( self, disable_badge_count: bool, event_id_only: bool ) -> None: # Register the user who gets notified user_id = self.register_user("user", "pass") access_token = self.login("user", "pass") # Register the user who sends the message other_user_id = self.register_user("otheruser", "pass") other_access_token = self.login("otheruser", "pass") # Register the pusher with disable_badge_count set to True user_tuple = self.get_success( self.hs.get_datastores().main.get_user_by_access_token(access_token) ) assert user_tuple is not None device_id = user_tuple.device_id # Set the push data dict based on test input parameters push_data: Dict[str, Any] = { "url": "http://example.com/_matrix/push/v1/notify", } if disable_badge_count: push_data["org.matrix.msc4076.disable_badge_count"] = True if event_id_only: push_data["format"] = "event_id_only" self.get_success( self.hs.get_pusherpool().add_or_update_pusher( user_id=user_id, device_id=device_id, kind="http", app_id="m.http", app_display_name="HTTP Push Notifications", device_display_name="pushy push", pushkey="a@example.com", lang=None, data=push_data, ) ) # Create a room room = self.helper.create_room_as(user_id, tok=access_token) # The other user joins self.helper.join(room=room, user=other_user_id, tok=other_access_token) # The other user sends a message self.helper.send(room, body="Hi!", tok=other_access_token) # Advance time a bit, so the pusher will register something has happened self.pump() # One push was attempted to be sent self.assertEqual(len(self.push_attempts), 1) self.assertEqual( self.push_attempts[0][1], "http://example.com/_matrix/push/v1/notify" ) if disable_badge_count: # Verify that the notification DOESN'T contain a counts field self.assertNotIn("counts", self.push_attempts[0][2]["notification"]) else: # Ensure that the notification DOES contain a counts field self.assertIn("counts", self.push_attempts[0][2]["notification"]) self.assertEqual( self.push_attempts[0][2]["notification"]["counts"]["unread"], 1 )