first draft of remove presence lists

This commit is contained in:
Neil Johnson 2019-04-02 12:44:46 +01:00
parent 4c552ed78a
commit 7adb9d06a2
5 changed files with 264 additions and 381 deletions

View file

@ -113,27 +113,27 @@ class PresenceHandler(object):
federation_registry.register_edu_handler( federation_registry.register_edu_handler(
"m.presence", self.incoming_presence "m.presence", self.incoming_presence
) )
federation_registry.register_edu_handler( # federation_registry.register_edu_handler(
"m.presence_invite", # "m.presence_invite",
lambda origin, content: self.invite_presence( # lambda origin, content: self.invite_presence(
observed_user=UserID.from_string(content["observed_user"]), # observed_user=UserID.from_string(content["observed_user"]),
observer_user=UserID.from_string(content["observer_user"]), # observer_user=UserID.from_string(content["observer_user"]),
) # )
) # )
federation_registry.register_edu_handler( # federation_registry.register_edu_handler(
"m.presence_accept", # "m.presence_accept",
lambda origin, content: self.accept_presence( # lambda origin, content: self.accept_presence(
observed_user=UserID.from_string(content["observed_user"]), # observed_user=UserID.from_string(content["observed_user"]),
observer_user=UserID.from_string(content["observer_user"]), # observer_user=UserID.from_string(content["observer_user"]),
) # )
) # )
federation_registry.register_edu_handler( # federation_registry.register_edu_handler(
"m.presence_deny", # "m.presence_deny",
lambda origin, content: self.deny_presence( # lambda origin, content: self.deny_presence(
observed_user=UserID.from_string(content["observed_user"]), # observed_user=UserID.from_string(content["observed_user"]),
observer_user=UserID.from_string(content["observer_user"]), # observer_user=UserID.from_string(content["observer_user"]),
) # )
) # )
active_presence = self.store.take_presence_startup_info() active_presence = self.store.take_presence_startup_info()
@ -759,136 +759,136 @@ class PresenceHandler(object):
yield self._update_states([prev_state.copy_and_replace(**new_fields)]) yield self._update_states([prev_state.copy_and_replace(**new_fields)])
@defer.inlineCallbacks # @defer.inlineCallbacks
def get_presence_list(self, observer_user, accepted=None): # def get_presence_list(self, observer_user, accepted=None):
"""Returns the presence for all users in their presence list. # """Returns the presence for all users in their presence list.
""" # """
if not self.is_mine(observer_user): # if not self.is_mine(observer_user):
raise SynapseError(400, "User is not hosted on this Home Server") # raise SynapseError(400, "User is not hosted on this Home Server")
presence_list = yield self.store.get_presence_list( # presence_list = yield self.store.get_presence_list(
observer_user.localpart, accepted=accepted # observer_user.localpart, accepted=accepted
) # )
results = yield self.get_states( # results = yield self.get_states(
target_user_ids=[row["observed_user_id"] for row in presence_list], # target_user_ids=[row["observed_user_id"] for row in presence_list],
as_event=False, # as_event=False,
) # )
now = self.clock.time_msec() # now = self.clock.time_msec()
results[:] = [format_user_presence_state(r, now) for r in results] # results[:] = [format_user_presence_state(r, now) for r in results]
is_accepted = { # is_accepted = {
row["observed_user_id"]: row["accepted"] for row in presence_list # row["observed_user_id"]: row["accepted"] for row in presence_list
} # }
for result in results: # for result in results:
result.update({ # result.update({
"accepted": is_accepted, # "accepted": is_accepted,
}) # })
defer.returnValue(results) # defer.returnValue(results)
@defer.inlineCallbacks # @defer.inlineCallbacks
def send_presence_invite(self, observer_user, observed_user): # def send_presence_invite(self, observer_user, observed_user):
"""Sends a presence invite. # """Sends a presence invite.
""" # """
yield self.store.add_presence_list_pending( # yield self.store.add_presence_list_pending(
observer_user.localpart, observed_user.to_string() # observer_user.localpart, observed_user.to_string()
) # )
if self.is_mine(observed_user): # if self.is_mine(observed_user):
yield self.invite_presence(observed_user, observer_user) # yield self.invite_presence(observed_user, observer_user)
else: # else:
yield self.federation.build_and_send_edu( # yield self.federation.build_and_send_edu(
destination=observed_user.domain, # destination=observed_user.domain,
edu_type="m.presence_invite", # edu_type="m.presence_invite",
content={ # content={
"observed_user": observed_user.to_string(), # "observed_user": observed_user.to_string(),
"observer_user": observer_user.to_string(), # "observer_user": observer_user.to_string(),
} # }
) # )
@defer.inlineCallbacks # @defer.inlineCallbacks
def invite_presence(self, observed_user, observer_user): # def invite_presence(self, observed_user, observer_user):
"""Handles new presence invites. # """Handles new presence invites.
""" # """
if not self.is_mine(observed_user): # if not self.is_mine(observed_user):
raise SynapseError(400, "User is not hosted on this Home Server") # raise SynapseError(400, "User is not hosted on this Home Server")
# TODO: Don't auto accept # # TODO: Don't auto accept
if self.is_mine(observer_user): # if self.is_mine(observer_user):
yield self.accept_presence(observed_user, observer_user) # yield self.accept_presence(observed_user, observer_user)
else: # else:
self.federation.build_and_send_edu( # self.federation.build_and_send_edu(
destination=observer_user.domain, # destination=observer_user.domain,
edu_type="m.presence_accept", # edu_type="m.presence_accept",
content={ # content={
"observed_user": observed_user.to_string(), # "observed_user": observed_user.to_string(),
"observer_user": observer_user.to_string(), # "observer_user": observer_user.to_string(),
} # }
) # )
state_dict = yield self.get_state(observed_user, as_event=False) # state_dict = yield self.get_state(observed_user, as_event=False)
state_dict = format_user_presence_state(state_dict, self.clock.time_msec()) # state_dict = format_user_presence_state(state_dict, self.clock.time_msec())
self.federation.build_and_send_edu( # self.federation.build_and_send_edu(
destination=observer_user.domain, # destination=observer_user.domain,
edu_type="m.presence", # edu_type="m.presence",
content={ # content={
"push": [state_dict] # "push": [state_dict]
} # }
) # )
@defer.inlineCallbacks # @defer.inlineCallbacks
def accept_presence(self, observed_user, observer_user): # def accept_presence(self, observed_user, observer_user):
"""Handles a m.presence_accept EDU. Mark a presence invite from a # """Handles a m.presence_accept EDU. Mark a presence invite from a
local or remote user as accepted in a local user's presence list. # local or remote user as accepted in a local user's presence list.
Starts polling for presence updates from the local or remote user. # Starts polling for presence updates from the local or remote user.
Args: # Args:
observed_user(UserID): The user to update in the presence list. # observed_user(UserID): The user to update in the presence list.
observer_user(UserID): The owner of the presence list to update. # observer_user(UserID): The owner of the presence list to update.
""" # """
yield self.store.set_presence_list_accepted( # yield self.store.set_presence_list_accepted(
observer_user.localpart, observed_user.to_string() # observer_user.localpart, observed_user.to_string()
) # )
@defer.inlineCallbacks # @defer.inlineCallbacks
def deny_presence(self, observed_user, observer_user): # def deny_presence(self, observed_user, observer_user):
"""Handle a m.presence_deny EDU. Removes a local or remote user from a # """Handle a m.presence_deny EDU. Removes a local or remote user from a
local user's presence list. # local user's presence list.
Args: # Args:
observed_user(UserID): The local or remote user to remove from the # observed_user(UserID): The local or remote user to remove from the
list. # list.
observer_user(UserID): The local owner of the presence list. # observer_user(UserID): The local owner of the presence list.
Returns: # Returns:
A Deferred. # A Deferred.
""" # """
yield self.store.del_presence_list( # yield self.store.del_presence_list(
observer_user.localpart, observed_user.to_string() # observer_user.localpart, observed_user.to_string()
) # )
# TODO(paul): Inform the user somehow? # TODO(paul): Inform the user somehow?
@defer.inlineCallbacks # @defer.inlineCallbacks
def drop(self, observed_user, observer_user): # def drop(self, observed_user, observer_user):
"""Remove a local or remote user from a local user's presence list and # """Remove a local or remote user from a local user's presence list and
unsubscribe the local user from updates that user. # unsubscribe the local user from updates that user.
Args: # Args:
observed_user(UserId): The local or remote user to remove from the # observed_user(UserId): The local or remote user to remove from the
list. # list.
observer_user(UserId): The local owner of the presence list. # observer_user(UserId): The local owner of the presence list.
Returns: # Returns:
A Deferred. # A Deferred.
""" # """
if not self.is_mine(observer_user): # if not self.is_mine(observer_user):
raise SynapseError(400, "User is not hosted on this Home Server") # raise SynapseError(400, "User is not hosted on this Home Server")
yield self.store.del_presence_list( # yield self.store.del_presence_list(
observer_user.localpart, observed_user.to_string() # observer_user.localpart, observed_user.to_string()
) # )
# TODO: Inform the remote that we've dropped the presence list. # # TODO: Inform the remote that we've dropped the presence list.
@defer.inlineCallbacks @defer.inlineCallbacks
def is_visible(self, observed_user, observer_user): def is_visible(self, observed_user, observer_user):
@ -904,11 +904,11 @@ class PresenceHandler(object):
if observer_room_ids & observed_room_ids: if observer_room_ids & observed_room_ids:
defer.returnValue(True) defer.returnValue(True)
accepted_observers = yield self.store.get_presence_list_observers_accepted( # accepted_observers = yield self.store.get_presence_list_observers_accepted(
observed_user.to_string() # observed_user.to_string()
) # )
defer.returnValue(observer_user.to_string() in accepted_observers) defer.returnValue(False)
@defer.inlineCallbacks @defer.inlineCallbacks
def get_all_presence_updates(self, last_id, current_id): def get_all_presence_updates(self, last_id, current_id):
@ -1204,10 +1204,11 @@ class PresenceEventSource(object):
updates for updates for
""" """
user_id = user.to_string() user_id = user.to_string()
plist = yield self.store.get_presence_list_accepted( # plist = yield self.store.get_presence_list_accepted(
user.localpart, on_invalidate=cache_context.invalidate, # user.localpart, on_invalidate=cache_context.invalidate,
) # )
users_interested_in = set(row["observed_user_id"] for row in plist) # users_interested_in = set(row["observed_user_id"] for row in plist)
users_interested_in = set()
users_interested_in.add(user_id) # So that we receive our own presence users_interested_in.add(user_id) # So that we receive our own presence
users_who_share_room = yield self.store.get_users_who_share_room_with_user( users_who_share_room = yield self.store.get_users_who_share_room_with_user(
@ -1412,9 +1413,9 @@ def get_interested_parties(store, states):
for room_id in room_ids: for room_id in room_ids:
room_ids_to_states.setdefault(room_id, []).append(state) room_ids_to_states.setdefault(room_id, []).append(state)
plist = yield store.get_presence_list_observers_accepted(state.user_id) # plist = yield store.get_presence_list_observers_accepted(state.user_id)
for u in plist: # for u in plist:
users_to_states.setdefault(u, []).append(state) # users_to_states.setdefault(u, []).append(state)
# Always notify self # Always notify self
users_to_states.setdefault(state.user_id, []).append(state) users_to_states.setdefault(state.user_id, []).append(state)

View file

@ -42,12 +42,12 @@ class SlavedPresenceStore(BaseSlavedStore):
# XXX: This is a bit broken because we don't persist the accepted list in a # XXX: This is a bit broken because we don't persist the accepted list in a
# way that can be replicated. This means that we don't have a way to # way that can be replicated. This means that we don't have a way to
# invalidate the cache correctly. # invalidate the cache correctly.
get_presence_list_accepted = PresenceStore.__dict__[ # get_presence_list_accepted = PresenceStore.__dict__[
"get_presence_list_accepted" # "get_presence_list_accepted"
] # ]
get_presence_list_observers_accepted = PresenceStore.__dict__[ # get_presence_list_observers_accepted = PresenceStore.__dict__[
"get_presence_list_observers_accepted" # "get_presence_list_observers_accepted"
] # ]
def get_current_presence_token(self): def get_current_presence_token(self):
return self._presence_id_gen.get_current_token() return self._presence_id_gen.get_current_token()

View file

@ -100,60 +100,60 @@ class PresenceListRestServlet(ClientV1RestServlet):
super(PresenceListRestServlet, self).__init__(hs) super(PresenceListRestServlet, self).__init__(hs)
self.presence_handler = hs.get_presence_handler() self.presence_handler = hs.get_presence_handler()
@defer.inlineCallbacks # @defer.inlineCallbacks
def on_GET(self, request, user_id): # def on_GET(self, request, user_id):
requester = yield self.auth.get_user_by_req(request) # requester = yield self.auth.get_user_by_req(request)
user = UserID.from_string(user_id) # user = UserID.from_string(user_id)
if not self.hs.is_mine(user): # if not self.hs.is_mine(user):
raise SynapseError(400, "User not hosted on this Home Server") # raise SynapseError(400, "User not hosted on this Home Server")
if requester.user != user: # if requester.user != user:
raise SynapseError(400, "Cannot get another user's presence list") # raise SynapseError(400, "Cannot get another user's presence list")
presence = yield self.presence_handler.get_presence_list( # presence = yield self.presence_handler.get_presence_list(
observer_user=user, accepted=True # observer_user=user, accepted=True
) # )
defer.returnValue((200, presence)) # defer.returnValue((200, presence))
@defer.inlineCallbacks # @defer.inlineCallbacks
def on_POST(self, request, user_id): # def on_POST(self, request, user_id):
requester = yield self.auth.get_user_by_req(request) # requester = yield self.auth.get_user_by_req(request)
user = UserID.from_string(user_id) # user = UserID.from_string(user_id)
if not self.hs.is_mine(user): # if not self.hs.is_mine(user):
raise SynapseError(400, "User not hosted on this Home Server") # raise SynapseError(400, "User not hosted on this Home Server")
if requester.user != user: # if requester.user != user:
raise SynapseError( # raise SynapseError(
400, "Cannot modify another user's presence list") # 400, "Cannot modify another user's presence list")
content = parse_json_object_from_request(request) # content = parse_json_object_from_request(request)
if "invite" in content: # if "invite" in content:
for u in content["invite"]: # for u in content["invite"]:
if not isinstance(u, string_types): # if not isinstance(u, string_types):
raise SynapseError(400, "Bad invite value.") # raise SynapseError(400, "Bad invite value.")
if len(u) == 0: # if len(u) == 0:
continue # continue
invited_user = UserID.from_string(u) # invited_user = UserID.from_string(u)
yield self.presence_handler.send_presence_invite( # yield self.presence_handler.send_presence_invite(
observer_user=user, observed_user=invited_user # observer_user=user, observed_user=invited_user
) # )
if "drop" in content: # # if "drop" in content:
for u in content["drop"]: # # for u in content["drop"]:
if not isinstance(u, string_types): # # if not isinstance(u, string_types):
raise SynapseError(400, "Bad drop value.") # # raise SynapseError(400, "Bad drop value.")
if len(u) == 0: # # if len(u) == 0:
continue # # continue
dropped_user = UserID.from_string(u) # # dropped_user = UserID.from_string(u)
yield self.presence_handler.drop( # # yield self.presence_handler.drop(
observer_user=user, observed_user=dropped_user # # observer_user=user, observed_user=dropped_user
) # # )
defer.returnValue((200, {})) # defer.returnValue((200, {}))
def on_OPTIONS(self, request): def on_OPTIONS(self, request):
return (200, {}) return (200, {})

View file

@ -194,84 +194,84 @@ class PresenceStore(SQLBaseStore):
desc="disallow_presence_visible", desc="disallow_presence_visible",
) )
def add_presence_list_pending(self, observer_localpart, observed_userid): # def add_presence_list_pending(self, observer_localpart, observed_userid):
return self._simple_insert( # return self._simple_insert(
table="presence_list", # table="presence_list",
values={"user_id": observer_localpart, # values={"user_id": observer_localpart,
"observed_user_id": observed_userid, # "observed_user_id": observed_userid,
"accepted": False}, # "accepted": False},
desc="add_presence_list_pending", # desc="add_presence_list_pending",
) # )
def set_presence_list_accepted(self, observer_localpart, observed_userid): # def set_presence_list_accepted(self, observer_localpart, observed_userid):
def update_presence_list_txn(txn): # def update_presence_list_txn(txn):
result = self._simple_update_one_txn( # result = self._simple_update_one_txn(
txn, # txn,
table="presence_list", # table="presence_list",
keyvalues={ # keyvalues={
"user_id": observer_localpart, # "user_id": observer_localpart,
"observed_user_id": observed_userid # "observed_user_id": observed_userid
}, # },
updatevalues={"accepted": True}, # updatevalues={"accepted": True},
) # )
self._invalidate_cache_and_stream( # self._invalidate_cache_and_stream(
txn, self.get_presence_list_accepted, (observer_localpart,) # txn, self.get_presence_list_accepted, (observer_localpart,)
) # )
self._invalidate_cache_and_stream( # self._invalidate_cache_and_stream(
txn, self.get_presence_list_observers_accepted, (observed_userid,) # txn, self.get_presence_list_observers_accepted, (observed_userid,)
) # )
return result # return result
return self.runInteraction( # return self.runInteraction(
"set_presence_list_accepted", update_presence_list_txn, # "set_presence_list_accepted", update_presence_list_txn,
) # )
def get_presence_list(self, observer_localpart, accepted=None): # def get_presence_list(self, observer_localpart, accepted=None):
if accepted: # if accepted:
return self.get_presence_list_accepted(observer_localpart) # return self.get_presence_list_accepted(observer_localpart)
else: # else:
keyvalues = {"user_id": observer_localpart} # keyvalues = {"user_id": observer_localpart}
if accepted is not None: # if accepted is not None:
keyvalues["accepted"] = accepted # keyvalues["accepted"] = accepted
return self._simple_select_list( # return self._simple_select_list(
table="presence_list", # table="presence_list",
keyvalues=keyvalues, # keyvalues=keyvalues,
retcols=["observed_user_id", "accepted"], # retcols=["observed_user_id", "accepted"],
desc="get_presence_list", # desc="get_presence_list",
) # )
@cached() # @cached()
def get_presence_list_accepted(self, observer_localpart): # def get_presence_list_accepted(self, observer_localpart):
return self._simple_select_list( # return self._simple_select_list(
table="presence_list", # table="presence_list",
keyvalues={"user_id": observer_localpart, "accepted": True}, # keyvalues={"user_id": observer_localpart, "accepted": True},
retcols=["observed_user_id", "accepted"], # retcols=["observed_user_id", "accepted"],
desc="get_presence_list_accepted", # desc="get_presence_list_accepted",
) # )
@cachedInlineCallbacks() # @cachedInlineCallbacks()
def get_presence_list_observers_accepted(self, observed_userid): # def get_presence_list_observers_accepted(self, observed_userid):
user_localparts = yield self._simple_select_onecol( # user_localparts = yield self._simple_select_onecol(
table="presence_list", # table="presence_list",
keyvalues={"observed_user_id": observed_userid, "accepted": True}, # keyvalues={"observed_user_id": observed_userid, "accepted": True},
retcol="user_id", # retcol="user_id",
desc="get_presence_list_accepted", # desc="get_presence_list_accepted",
) # )
defer.returnValue([ # defer.returnValue([
"@%s:%s" % (u, self.hs.hostname,) for u in user_localparts # "@%s:%s" % (u, self.hs.hostname,) for u in user_localparts
]) # ])
@defer.inlineCallbacks # @defer.inlineCallbacks
def del_presence_list(self, observer_localpart, observed_userid): # def del_presence_list(self, observer_localpart, observed_userid):
yield self._simple_delete_one( # yield self._simple_delete_one(
table="presence_list", # table="presence_list",
keyvalues={"user_id": observer_localpart, # keyvalues={"user_id": observer_localpart,
"observed_user_id": observed_userid}, # "observed_user_id": observed_userid},
desc="del_presence_list", # desc="del_presence_list",
) # )
self.get_presence_list_accepted.invalidate((observer_localpart,)) # self.get_presence_list_accepted.invalidate((observer_localpart,))
self.get_presence_list_observers_accepted.invalidate((observed_userid,)) # self.get_presence_list_observers_accepted.invalidate((observed_userid,))

View file

@ -1,118 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.internet import defer
from synapse.types import UserID
from tests import unittest
from tests.utils import setup_test_homeserver
class PresenceStoreTestCase(unittest.TestCase):
@defer.inlineCallbacks
def setUp(self):
hs = yield setup_test_homeserver(self.addCleanup)
self.store = hs.get_datastore()
self.u_apple = UserID.from_string("@apple:test")
self.u_banana = UserID.from_string("@banana:test")
@defer.inlineCallbacks
def test_presence_list(self):
self.assertEquals(
[],
(
yield self.store.get_presence_list(
observer_localpart=self.u_apple.localpart
)
),
)
self.assertEquals(
[],
(
yield self.store.get_presence_list(
observer_localpart=self.u_apple.localpart, accepted=True
)
),
)
yield self.store.add_presence_list_pending(
observer_localpart=self.u_apple.localpart,
observed_userid=self.u_banana.to_string(),
)
self.assertEquals(
[{"observed_user_id": "@banana:test", "accepted": 0}],
(
yield self.store.get_presence_list(
observer_localpart=self.u_apple.localpart
)
),
)
self.assertEquals(
[],
(
yield self.store.get_presence_list(
observer_localpart=self.u_apple.localpart, accepted=True
)
),
)
yield self.store.set_presence_list_accepted(
observer_localpart=self.u_apple.localpart,
observed_userid=self.u_banana.to_string(),
)
self.assertEquals(
[{"observed_user_id": "@banana:test", "accepted": 1}],
(
yield self.store.get_presence_list(
observer_localpart=self.u_apple.localpart
)
),
)
self.assertEquals(
[{"observed_user_id": "@banana:test", "accepted": 1}],
(
yield self.store.get_presence_list(
observer_localpart=self.u_apple.localpart, accepted=True
)
),
)
yield self.store.del_presence_list(
observer_localpart=self.u_apple.localpart,
observed_userid=self.u_banana.to_string(),
)
self.assertEquals(
[],
(
yield self.store.get_presence_list(
observer_localpart=self.u_apple.localpart
)
),
)
self.assertEquals(
[],
(
yield self.store.get_presence_list(
observer_localpart=self.u_apple.localpart, accepted=True
)
),
)