mirror of
https://github.com/element-hq/synapse.git
synced 2024-11-24 18:45:52 +03:00
Start chagning the events stream to work with the new DB schema
This commit is contained in:
parent
d72f897f07
commit
114984a236
8 changed files with 102 additions and 118 deletions
|
@ -28,17 +28,17 @@ import logging
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class MessagesStreamData(StreamData):
|
class EventsStreamData(StreamData):
|
||||||
EVENT_TYPE = MessageEvent.TYPE
|
EVENT_TYPE = "EventsStream"
|
||||||
|
|
||||||
def __init__(self, hs, room_id=None, feedback=False):
|
def __init__(self, hs, room_id=None, feedback=False):
|
||||||
super(MessagesStreamData, self).__init__(hs)
|
super(EventsStreamData, self).__init__(hs)
|
||||||
self.room_id = room_id
|
self.room_id = room_id
|
||||||
self.with_feedback = feedback
|
self.with_feedback = feedback
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def get_rows(self, user_id, from_key, to_key, limit):
|
def get_rows(self, user_id, from_key, to_key, limit):
|
||||||
(data, latest_ver) = yield self.store.get_message_stream(
|
data, latest_ver = yield self.store.get_room_events_stream(
|
||||||
user_id=user_id,
|
user_id=user_id,
|
||||||
from_key=from_key,
|
from_key=from_key,
|
||||||
to_key=to_key,
|
to_key=to_key,
|
||||||
|
@ -50,74 +50,7 @@ class MessagesStreamData(StreamData):
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def max_token(self):
|
def max_token(self):
|
||||||
val = yield self.store.get_max_message_id()
|
val = yield self.store.get_room_events_max_id()
|
||||||
defer.returnValue(val)
|
|
||||||
|
|
||||||
|
|
||||||
class RoomMemberStreamData(StreamData):
|
|
||||||
EVENT_TYPE = RoomMemberEvent.TYPE
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def get_rows(self, user_id, from_key, to_key, limit):
|
|
||||||
(data, latest_ver) = yield self.store.get_room_member_stream(
|
|
||||||
user_id=user_id,
|
|
||||||
from_key=from_key,
|
|
||||||
to_key=to_key
|
|
||||||
)
|
|
||||||
|
|
||||||
defer.returnValue((data, latest_ver))
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def max_token(self):
|
|
||||||
val = yield self.store.get_max_room_member_id()
|
|
||||||
defer.returnValue(val)
|
|
||||||
|
|
||||||
|
|
||||||
class FeedbackStreamData(StreamData):
|
|
||||||
EVENT_TYPE = FeedbackEvent.TYPE
|
|
||||||
|
|
||||||
def __init__(self, hs, room_id=None):
|
|
||||||
super(FeedbackStreamData, self).__init__(hs)
|
|
||||||
self.room_id = room_id
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def get_rows(self, user_id, from_key, to_key, limit):
|
|
||||||
(data, latest_ver) = yield self.store.get_feedback_stream(
|
|
||||||
user_id=user_id,
|
|
||||||
from_key=from_key,
|
|
||||||
to_key=to_key,
|
|
||||||
limit=limit,
|
|
||||||
room_id=self.room_id
|
|
||||||
)
|
|
||||||
defer.returnValue((data, latest_ver))
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def max_token(self):
|
|
||||||
val = yield self.store.get_max_feedback_id()
|
|
||||||
defer.returnValue(val)
|
|
||||||
|
|
||||||
|
|
||||||
class RoomDataStreamData(StreamData):
|
|
||||||
EVENT_TYPE = RoomTopicEvent.TYPE # TODO need multiple event types
|
|
||||||
|
|
||||||
def __init__(self, hs, room_id=None):
|
|
||||||
super(RoomDataStreamData, self).__init__(hs)
|
|
||||||
self.room_id = room_id
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def get_rows(self, user_id, from_key, to_key, limit):
|
|
||||||
(data, latest_ver) = yield self.store.get_room_data_stream(
|
|
||||||
user_id=user_id,
|
|
||||||
from_key=from_key,
|
|
||||||
to_key=to_key,
|
|
||||||
limit=limit,
|
|
||||||
room_id=self.room_id
|
|
||||||
)
|
|
||||||
defer.returnValue((data, latest_ver))
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def max_token(self):
|
|
||||||
val = yield self.store.get_max_room_data_id()
|
|
||||||
defer.returnValue(val)
|
defer.returnValue(val)
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -17,8 +17,7 @@ from twisted.internet import defer
|
||||||
|
|
||||||
from ._base import BaseHandler
|
from ._base import BaseHandler
|
||||||
from synapse.api.streams.event import (
|
from synapse.api.streams.event import (
|
||||||
EventStream, MessagesStreamData, RoomMemberStreamData, FeedbackStreamData,
|
EventStream, EventsStreamData
|
||||||
RoomDataStreamData
|
|
||||||
)
|
)
|
||||||
from synapse.handlers.presence import PresenceStreamData
|
from synapse.handlers.presence import PresenceStreamData
|
||||||
|
|
||||||
|
@ -26,10 +25,7 @@ from synapse.handlers.presence import PresenceStreamData
|
||||||
class EventStreamHandler(BaseHandler):
|
class EventStreamHandler(BaseHandler):
|
||||||
|
|
||||||
stream_data_classes = [
|
stream_data_classes = [
|
||||||
MessagesStreamData,
|
EventsStreamData,
|
||||||
RoomMemberStreamData,
|
|
||||||
FeedbackStreamData,
|
|
||||||
RoomDataStreamData,
|
|
||||||
PresenceStreamData,
|
PresenceStreamData,
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
|
@ -23,7 +23,7 @@ from synapse.api.events.room import (
|
||||||
RoomTopicEvent, MessageEvent, InviteJoinEvent, RoomMemberEvent,
|
RoomTopicEvent, MessageEvent, InviteJoinEvent, RoomMemberEvent,
|
||||||
RoomConfigEvent
|
RoomConfigEvent
|
||||||
)
|
)
|
||||||
from synapse.api.streams.event import EventStream, MessagesStreamData
|
from synapse.api.streams.event import EventStream, EventsStreamData
|
||||||
from synapse.util import stringutils
|
from synapse.util import stringutils
|
||||||
from ._base import BaseHandler
|
from ._base import BaseHandler
|
||||||
|
|
||||||
|
@ -97,30 +97,30 @@ class MessageHandler(BaseHandler):
|
||||||
self.notifier.on_new_room_event(event, store_id)
|
self.notifier.on_new_room_event(event, store_id)
|
||||||
|
|
||||||
yield self.hs.get_federation().handle_new_event(event)
|
yield self.hs.get_federation().handle_new_event(event)
|
||||||
|
#
|
||||||
@defer.inlineCallbacks
|
# @defer.inlineCallbacks
|
||||||
def get_messages(self, user_id=None, room_id=None, pagin_config=None,
|
# def get_messages(self, user_id=None, room_id=None, pagin_config=None,
|
||||||
feedback=False):
|
# feedback=False):
|
||||||
"""Get messages in a room.
|
# """Get messages in a room.
|
||||||
|
#
|
||||||
Args:
|
# Args:
|
||||||
user_id (str): The user requesting messages.
|
# user_id (str): The user requesting messages.
|
||||||
room_id (str): The room they want messages from.
|
# room_id (str): The room they want messages from.
|
||||||
pagin_config (synapse.api.streams.PaginationConfig): The pagination
|
# pagin_config (synapse.api.streams.PaginationConfig): The pagination
|
||||||
config rules to apply, if any.
|
# config rules to apply, if any.
|
||||||
feedback (bool): True to get compressed feedback with the messages
|
# feedback (bool): True to get compressed feedback with the messages
|
||||||
Returns:
|
# Returns:
|
||||||
dict: Pagination API results
|
# dict: Pagination API results
|
||||||
"""
|
# """
|
||||||
yield self.auth.check_joined_room(room_id, user_id)
|
# yield self.auth.check_joined_room(room_id, user_id)
|
||||||
|
#
|
||||||
data_source = [MessagesStreamData(self.hs, room_id=room_id,
|
# data_source = [MessagesStreamData(self.hs, room_id=room_id,
|
||||||
feedback=feedback)]
|
# feedback=feedback)]
|
||||||
event_stream = EventStream(user_id, data_source)
|
# event_stream = EventStream(user_id, data_source)
|
||||||
pagin_config = yield event_stream.fix_tokens(pagin_config)
|
# pagin_config = yield event_stream.fix_tokens(pagin_config)
|
||||||
data_chunk = yield event_stream.get_chunk(config=pagin_config)
|
# data_chunk = yield event_stream.get_chunk(config=pagin_config)
|
||||||
defer.returnValue(data_chunk)
|
# defer.returnValue(data_chunk)
|
||||||
|
#
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def store_room_data(self, event=None, stamp_event=True):
|
def store_room_data(self, event=None, stamp_event=True):
|
||||||
""" Stores data for a room.
|
""" Stores data for a room.
|
||||||
|
@ -251,20 +251,27 @@ class MessageHandler(BaseHandler):
|
||||||
user_id=user_id,
|
user_id=user_id,
|
||||||
membership_list=[Membership.INVITE, Membership.JOIN]
|
membership_list=[Membership.INVITE, Membership.JOIN]
|
||||||
)
|
)
|
||||||
for room_info in room_list:
|
|
||||||
if room_info["membership"] != Membership.JOIN:
|
ret = []
|
||||||
|
|
||||||
|
for event in room_list:
|
||||||
|
d = event.get_dict()
|
||||||
|
ret.append(d)
|
||||||
|
|
||||||
|
if event.membership != Membership.JOIN:
|
||||||
continue
|
continue
|
||||||
try:
|
try:
|
||||||
event_chunk = yield self.get_messages(
|
messages = yield self.store.get_recent_events_for_room(
|
||||||
user_id=user_id,
|
event.room_id,
|
||||||
pagin_config=pagin_config,
|
limit=50,
|
||||||
feedback=feedback,
|
|
||||||
room_id=room_info["room_id"]
|
|
||||||
)
|
)
|
||||||
room_info["messages"] = event_chunk
|
d["messages"] = [m.get_dict() for m in messages]
|
||||||
except:
|
except:
|
||||||
pass
|
pass
|
||||||
defer.returnValue(room_list)
|
|
||||||
|
logger.debug("snapshot_all_rooms returning: %s", ret)
|
||||||
|
|
||||||
|
defer.returnValue(ret)
|
||||||
|
|
||||||
|
|
||||||
class RoomCreationHandler(BaseHandler):
|
class RoomCreationHandler(BaseHandler):
|
||||||
|
@ -442,7 +449,7 @@ class RoomMemberHandler(BaseHandler):
|
||||||
|
|
||||||
member_list = yield self.store.get_room_members(room_id=room_id)
|
member_list = yield self.store.get_room_members(room_id=room_id)
|
||||||
event_list = [
|
event_list = [
|
||||||
entry.as_event(self.event_factory).get_dict()
|
entry.get_dict()
|
||||||
for entry in member_list
|
for entry in member_list
|
||||||
]
|
]
|
||||||
chunk_data = {
|
chunk_data = {
|
||||||
|
@ -685,7 +692,7 @@ class RoomMemberHandler(BaseHandler):
|
||||||
user_id=user.to_string(), membership_list=membership_list
|
user_id=user.to_string(), membership_list=membership_list
|
||||||
)
|
)
|
||||||
|
|
||||||
defer.returnValue([r["room_id"] for r in rooms])
|
defer.returnValue([r.room_id for r in rooms])
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _do_local_membership_update(self, event, membership, broadcast_msg):
|
def _do_local_membership_update(self, event, membership, broadcast_msg):
|
||||||
|
|
|
@ -104,7 +104,15 @@ class DataStore(RoomMemberStore, RoomStore,
|
||||||
|
|
||||||
yield self._simple_insert("state_events", vals)
|
yield self._simple_insert("state_events", vals)
|
||||||
|
|
||||||
# TODO (erikj): We also need to update the current state table?
|
yield self._simple_insert(
|
||||||
|
"current_state_events",
|
||||||
|
{
|
||||||
|
"event_id": event.event_id,
|
||||||
|
"room_id": event.room_id,
|
||||||
|
"type": event.type,
|
||||||
|
"state_key": event.state_key,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def get_current_state(self, room_id, event_type=None, state_key=""):
|
def get_current_state(self, room_id, event_type=None, state_key=""):
|
||||||
|
|
|
@ -293,7 +293,7 @@ class SQLBaseStore(object):
|
||||||
|
|
||||||
def _parse_event_from_row(self, row_dict):
|
def _parse_event_from_row(self, row_dict):
|
||||||
d = copy.deepcopy({k: v for k, v in row_dict.items() if v})
|
d = copy.deepcopy({k: v for k, v in row_dict.items() if v})
|
||||||
d.update(json.loads(json.loads(row_dict["unrecognized_keys"])))
|
d.update(json.loads(row_dict["unrecognized_keys"]))
|
||||||
d["content"] = json.loads(d["content"])
|
d["content"] = json.loads(d["content"])
|
||||||
del d["unrecognized_keys"]
|
del d["unrecognized_keys"]
|
||||||
|
|
||||||
|
|
|
@ -63,6 +63,7 @@ class RoomMemberStore(SQLBaseStore):
|
||||||
yield self._execute(None, sql, event.room_id, domain)
|
yield self._execute(None, sql, event.room_id, domain)
|
||||||
|
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
def get_room_member(self, user_id, room_id):
|
def get_room_member(self, user_id, room_id):
|
||||||
"""Retrieve the current state of a room member.
|
"""Retrieve the current state of a room member.
|
||||||
|
|
||||||
|
@ -72,11 +73,13 @@ class RoomMemberStore(SQLBaseStore):
|
||||||
Returns:
|
Returns:
|
||||||
Deferred: Results in a MembershipEvent or None.
|
Deferred: Results in a MembershipEvent or None.
|
||||||
"""
|
"""
|
||||||
return self._get_members_by_dict({
|
rows = yield self._get_members_by_dict({
|
||||||
"e.room_id": room_id,
|
"e.room_id": room_id,
|
||||||
"m.user_id": user_id,
|
"m.user_id": user_id,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
defer.returnValue(rows[0] if rows else None)
|
||||||
|
|
||||||
def get_room_members(self, room_id, membership=None):
|
def get_room_members(self, room_id, membership=None):
|
||||||
"""Retrieve the current room member list for a room.
|
"""Retrieve the current room member list for a room.
|
||||||
|
|
||||||
|
@ -142,5 +145,8 @@ class RoomMemberStore(SQLBaseStore):
|
||||||
) % (where_clause,)
|
) % (where_clause,)
|
||||||
|
|
||||||
rows = yield self._execute_and_decode(sql, *where_values)
|
rows = yield self._execute_and_decode(sql, *where_values)
|
||||||
|
|
||||||
|
logger.debug("_get_members_query Got rows %s", rows)
|
||||||
|
|
||||||
results = [self._parse_event_from_row(r) for r in rows]
|
results = [self._parse_event_from_row(r) for r in rows]
|
||||||
defer.returnValue(results)
|
defer.returnValue(results)
|
||||||
|
|
|
@ -32,7 +32,10 @@ CREATE TABLE IF NOT EXISTS state_events(
|
||||||
|
|
||||||
CREATE TABLE IF NOT EXISTS current_state_events(
|
CREATE TABLE IF NOT EXISTS current_state_events(
|
||||||
event_id TEXT NOT NULL,
|
event_id TEXT NOT NULL,
|
||||||
room_id TEXT NOT NULL
|
room_id TEXT NOT NULL,
|
||||||
|
type TEXT NOT NULL,
|
||||||
|
state_key TEXT NOT NULL,
|
||||||
|
CONSTRAINT uniq UNIQUE (room_id, type, state_key) ON CONFLICT REPLACE
|
||||||
);
|
);
|
||||||
|
|
||||||
CREATE TABLE IF NOT EXISTS room_memberships(
|
CREATE TABLE IF NOT EXISTS room_memberships(
|
||||||
|
|
|
@ -34,6 +34,7 @@ class StreamStore(SQLBaseStore):
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def get_room_events_stream(self, user_id, from_key, to_key, room_id,
|
def get_room_events_stream(self, user_id, from_key, to_key, room_id,
|
||||||
limit=0, with_feedback=False):
|
limit=0, with_feedback=False):
|
||||||
|
# TODO (erikj): Handle compressed feedback
|
||||||
|
|
||||||
current_room_membership_sql = (
|
current_room_membership_sql = (
|
||||||
"SELECT m.room_id FROM room_memberships as m "
|
"SELECT m.room_id FROM room_memberships as m "
|
||||||
|
@ -69,3 +70,33 @@ class StreamStore(SQLBaseStore):
|
||||||
)
|
)
|
||||||
|
|
||||||
defer.returnValue([self._parse_event_from_row(r) for r in rows])
|
defer.returnValue([self._parse_event_from_row(r) for r in rows])
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def get_recent_events_for_room(self, room_id, limit, with_feedback=False):
|
||||||
|
# TODO (erikj): Handle compressed feedback
|
||||||
|
|
||||||
|
sql = (
|
||||||
|
"SELECT * FROM events WHERE room_id = ? "
|
||||||
|
"ORDER BY ordering DESC LIMIT ? "
|
||||||
|
)
|
||||||
|
|
||||||
|
rows = yield self._execute_and_decode(
|
||||||
|
sql,
|
||||||
|
room_id, limit
|
||||||
|
)
|
||||||
|
|
||||||
|
rows.reverse() # As we selected with reverse ordering
|
||||||
|
|
||||||
|
defer.returnValue([self._parse_event_from_row(r) for r in rows])
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def get_room_events_max_id(self):
|
||||||
|
res = yield self._execute_and_decode(
|
||||||
|
"SELECT MAX(ordering) as m FROM events"
|
||||||
|
)
|
||||||
|
|
||||||
|
if not res:
|
||||||
|
defer.returnValue(0)
|
||||||
|
return
|
||||||
|
|
||||||
|
defer.returnValue(res[0]["m"])
|
||||||
|
|
Loading…
Reference in a new issue