From cfdcf45ac6eb019242b5d969ce8018fae195caec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Tue, 12 Nov 2019 13:29:07 +0100 Subject: [PATCH] MatrixChat: Move the event indexing logic into separate modules. --- src/EventIndexPeg.js | 74 +++++ src/EventIndexing.js | 404 ++++++++++++++++++++++++ src/Lifecycle.js | 2 + src/MatrixClientPeg.js | 4 +- src/components/structures/MatrixChat.js | 371 ++-------------------- 5 files changed, 499 insertions(+), 356 deletions(-) create mode 100644 src/EventIndexPeg.js create mode 100644 src/EventIndexing.js diff --git a/src/EventIndexPeg.js b/src/EventIndexPeg.js new file mode 100644 index 0000000000..794450e4b7 --- /dev/null +++ b/src/EventIndexPeg.js @@ -0,0 +1,74 @@ +/* +Copyright 2019 New Vector Ltd + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +/* + * Holds the current Platform object used by the code to do anything + * specific to the platform we're running on (eg. web, electron) + * Platforms are provided by the app layer. + * This allows the app layer to set a Platform without necessarily + * having to have a MatrixChat object + */ + +import PlatformPeg from "./PlatformPeg"; +import EventIndex from "./EventIndexing"; +import MatrixClientPeg from "./MatrixClientPeg"; + +class EventIndexPeg { + constructor() { + this.index = null; + } + + /** + * Returns the current Event index object for the application. Can be null + * if the platform doesn't support event indexing. + */ + get() { + return this.index; + } + + /** Create a new EventIndex and initialize it if the platform supports it. + * Returns true if an EventIndex was successfully initialized, false + * otherwise. + */ + async init() { + const platform = PlatformPeg.get(); + if (!platform.supportsEventIndexing()) return false; + + let index = new EventIndex(); + + const userId = MatrixClientPeg.get().getUserId(); + // TODO log errors here and return false if it errors out. + await index.init(userId); + this.index = index; + + return true + } + + async stop() { + if (this.index == null) return; + index.stopCrawler(); + } + + async deleteEventIndex() { + if (this.index == null) return; + index.deleteEventIndex(); + } +} + +if (!global.mxEventIndexPeg) { + global.mxEventIndexPeg = new EventIndexPeg(); +} +module.exports = global.mxEventIndexPeg; diff --git a/src/EventIndexing.js b/src/EventIndexing.js new file mode 100644 index 0000000000..21ee8f3da6 --- /dev/null +++ b/src/EventIndexing.js @@ -0,0 +1,404 @@ +/* +Copyright 2019 New Vector Ltd + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +import PlatformPeg from "./PlatformPeg"; +import MatrixClientPeg from "./MatrixClientPeg"; + +/** + * Event indexing class that wraps the platform specific event indexing. + */ +export default class EventIndexer { + constructor() { + this.crawlerChekpoints = []; + // The time that the crawler will wait between /rooms/{room_id}/messages + // requests + this._crawler_timeout = 3000; + this._crawlerRef = null; + this.liveEventsForIndex = new Set(); + } + + async init(userId) { + const platform = PlatformPeg.get(); + if (!platform.supportsEventIndexing()) return false; + platform.initEventIndex(userId); + } + + async onSync(state, prevState, data) { + const platform = PlatformPeg.get(); + if (!platform.supportsEventIndexing()) return; + + if (prevState === null && state === "PREPARED") { + // Load our stored checkpoints, if any. + this.crawlerChekpoints = await platform.loadCheckpoints(); + console.log("Seshat: Loaded checkpoints", + this.crawlerChekpoints); + return; + } + + if (prevState === "PREPARED" && state === "SYNCING") { + const addInitialCheckpoints = async () => { + const client = MatrixClientPeg.get(); + const rooms = client.getRooms(); + + const isRoomEncrypted = (room) => { + return client.isRoomEncrypted(room.roomId); + }; + + // We only care to crawl the encrypted rooms, non-encrytped + // rooms can use the search provided by the Homeserver. + const encryptedRooms = rooms.filter(isRoomEncrypted); + + console.log("Seshat: Adding initial crawler checkpoints"); + + // Gather the prev_batch tokens and create checkpoints for + // our message crawler. + await Promise.all(encryptedRooms.map(async (room) => { + const timeline = room.getLiveTimeline(); + const token = timeline.getPaginationToken("b"); + + console.log("Seshat: Got token for indexer", + room.roomId, token); + + const backCheckpoint = { + roomId: room.roomId, + token: token, + direction: "b", + }; + + const forwardCheckpoint = { + roomId: room.roomId, + token: token, + direction: "f", + }; + + await platform.addCrawlerCheckpoint(backCheckpoint); + await platform.addCrawlerCheckpoint(forwardCheckpoint); + this.crawlerChekpoints.push(backCheckpoint); + this.crawlerChekpoints.push(forwardCheckpoint); + })); + }; + + // If our indexer is empty we're most likely running Riot the + // first time with indexing support or running it with an + // initial sync. Add checkpoints to crawl our encrypted rooms. + const eventIndexWasEmpty = await platform.isEventIndexEmpty(); + if (eventIndexWasEmpty) await addInitialCheckpoints(); + + // Start our crawler. + this.startCrawler(); + return; + } + + if (prevState === "SYNCING" && state === "SYNCING") { + // A sync was done, presumably we queued up some live events, + // commit them now. + console.log("Seshat: Committing events"); + await platform.commitLiveEvents(); + return; + } + } + + async onRoomTimeline(ev, room, toStartOfTimeline, removed, data) { + const platform = PlatformPeg.get(); + if (!platform.supportsEventIndexing()) return; + + // We only index encrypted rooms locally. + if (!MatrixClientPeg.get().isRoomEncrypted(room.roomId)) return; + + // If it isn't a live event or if it's redacted there's nothing to + // do. + if (toStartOfTimeline || !data || !data.liveEvent + || ev.isRedacted()) { + return; + } + + // If the event is not yet decrypted mark it for the + // Event.decrypted callback. + if (ev.isBeingDecrypted()) { + const eventId = ev.getId(); + this.liveEventsForIndex.add(eventId); + } else { + // If the event is decrypted or is unencrypted add it to the + // index now. + await this.addLiveEventToIndex(ev); + } + } + + async onEventDecrypted(ev, err) { + const platform = PlatformPeg.get(); + if (!platform.supportsEventIndexing()) return; + + const eventId = ev.getId(); + + // If the event isn't in our live event set, ignore it. + if (!this.liveEventsForIndex.delete(eventId)) return; + if (err) return; + await this.addLiveEventToIndex(ev); + } + + async addLiveEventToIndex(ev) { + const platform = PlatformPeg.get(); + if (!platform.supportsEventIndexing()) return; + + if (["m.room.message", "m.room.name", "m.room.topic"] + .indexOf(ev.getType()) == -1) { + return; + } + + const e = ev.toJSON().decrypted; + const profile = { + displayname: ev.sender.rawDisplayName, + avatar_url: ev.sender.getMxcAvatarUrl(), + }; + + platform.addEventToIndex(e, profile); + } + + async crawlerFunc(handle) { + // TODO either put this in a better place or find a library provided + // method that does this. + const sleep = async (ms) => { + return new Promise(resolve => setTimeout(resolve, ms)); + }; + + let cancelled = false; + + console.log("Seshat: Started crawler function"); + + const client = MatrixClientPeg.get(); + const platform = PlatformPeg.get(); + + handle.cancel = () => { + cancelled = true; + }; + + while (!cancelled) { + // This is a low priority task and we don't want to spam our + // Homeserver with /messages requests so we set a hefty timeout + // here. + await sleep(this._crawler_timeout); + + console.log("Seshat: Running the crawler loop."); + + if (cancelled) { + console.log("Seshat: Cancelling the crawler."); + break; + } + + const checkpoint = this.crawlerChekpoints.shift(); + + /// There is no checkpoint available currently, one may appear if + // a sync with limited room timelines happens, so go back to sleep. + if (checkpoint === undefined) { + continue; + } + + console.log("Seshat: crawling using checkpoint", checkpoint); + + // We have a checkpoint, let us fetch some messages, again, very + // conservatively to not bother our Homeserver too much. + const eventMapper = client.getEventMapper(); + // TODO we need to ensure to use member lazy loading with this + // request so we get the correct profiles. + let res; + + try { + res = await client._createMessagesRequest( + checkpoint.roomId, checkpoint.token, 100, + checkpoint.direction); + } catch (e) { + console.log("Seshat: Error crawling events:", e); + this.crawlerChekpoints.push(checkpoint); + continue + } + + if (res.chunk.length === 0) { + console.log("Seshat: Done with the checkpoint", checkpoint); + // We got to the start/end of our timeline, lets just + // delete our checkpoint and go back to sleep. + await platform.removeCrawlerCheckpoint(checkpoint); + continue; + } + + // Convert the plain JSON events into Matrix events so they get + // decrypted if necessary. + const matrixEvents = res.chunk.map(eventMapper); + let stateEvents = []; + if (res.state !== undefined) { + stateEvents = res.state.map(eventMapper); + } + + const profiles = {}; + + stateEvents.forEach(ev => { + if (ev.event.content && + ev.event.content.membership === "join") { + profiles[ev.event.sender] = { + displayname: ev.event.content.displayname, + avatar_url: ev.event.content.avatar_url, + }; + } + }); + + const decryptionPromises = []; + + matrixEvents.forEach(ev => { + if (ev.isBeingDecrypted() || ev.isDecryptionFailure()) { + // TODO the decryption promise is a private property, this + // should either be made public or we should convert the + // event that gets fired when decryption is done into a + // promise using the once event emitter method: + // https://nodejs.org/api/events.html#events_events_once_emitter_name + decryptionPromises.push(ev._decryptionPromise); + } + }); + + // Let us wait for all the events to get decrypted. + await Promise.all(decryptionPromises); + + // We filter out events for which decryption failed, are redacted + // or aren't of a type that we know how to index. + const isValidEvent = (value) => { + return ([ + "m.room.message", + "m.room.name", + "m.room.topic", + ].indexOf(value.getType()) >= 0 + && !value.isRedacted() && !value.isDecryptionFailure() + ); + // TODO do we need to check if the event has all the valid + // attributes? + }; + + // TODO if there ar no events at this point we're missing a lot + // decryption keys, do we wan't to retry this checkpoint at a later + // stage? + const filteredEvents = matrixEvents.filter(isValidEvent); + + // Let us convert the events back into a format that Seshat can + // consume. + const events = filteredEvents.map((ev) => { + const jsonEvent = ev.toJSON(); + + let e; + if (ev.isEncrypted()) e = jsonEvent.decrypted; + else e = jsonEvent; + + let profile = {}; + if (e.sender in profiles) profile = profiles[e.sender]; + const object = { + event: e, + profile: profile, + }; + return object; + }); + + // Create a new checkpoint so we can continue crawling the room for + // messages. + const newCheckpoint = { + roomId: checkpoint.roomId, + token: res.end, + fullCrawl: checkpoint.fullCrawl, + direction: checkpoint.direction, + }; + + console.log( + "Seshat: Crawled room", + client.getRoom(checkpoint.roomId).name, + "and fetched", events.length, "events.", + ); + + try { + const eventsAlreadyAdded = await platform.addHistoricEvents( + events, newCheckpoint, checkpoint); + // If all events were already indexed we assume that we catched + // up with our index and don't need to crawl the room further. + // Let us delete the checkpoint in that case, otherwise push + // the new checkpoint to be used by the crawler. + if (eventsAlreadyAdded === true && newCheckpoint.fullCrawl !== true) { + console.log("Seshat: Checkpoint had already all events", + "added, stopping the crawl", checkpoint); + await platform.removeCrawlerCheckpoint(newCheckpoint); + } else { + this.crawlerChekpoints.push(newCheckpoint); + } + } catch (e) { + console.log("Seshat: Error durring a crawl", e); + // An error occured, put the checkpoint back so we + // can retry. + this.crawlerChekpoints.push(checkpoint); + } + } + + console.log("Seshat: Stopping crawler function"); + } + + async addCheckpointForLimitedRoom(roomId) { + const platform = PlatformPeg.get(); + if (!platform.supportsEventIndexing()) return; + if (!MatrixClientPeg.get().isRoomEncrypted(roomId)) return; + + const client = MatrixClientPeg.get(); + const room = client.getRoom(roomId); + + if (room === null) return; + + const timeline = room.getLiveTimeline(); + const token = timeline.getPaginationToken("b"); + + const backwardsCheckpoint = { + roomId: room.roomId, + token: token, + fullCrawl: false, + direction: "b", + }; + + const forwardsCheckpoint = { + roomId: room.roomId, + token: token, + fullCrawl: false, + direction: "f", + }; + + console.log("Seshat: Added checkpoint because of a limited timeline", + backwardsCheckpoint, forwardsCheckpoint); + + await platform.addCrawlerCheckpoint(backwardsCheckpoint); + await platform.addCrawlerCheckpoint(forwardsCheckpoint); + + this.crawlerChekpoints.push(backwardsCheckpoint); + this.crawlerChekpoints.push(forwardsCheckpoint); + } + + async deleteEventIndex() { + if (platform.supportsEventIndexing()) { + console.log("Seshat: Deleting event index."); + this.crawlerRef.cancel(); + await platform.deleteEventIndex(); + } + } + + startCrawler() { + const crawlerHandle = {}; + this.crawlerFunc(crawlerHandle); + this.crawlerRef = crawlerHandle; + } + + stopCrawler() { + this._crawlerRef.cancel(); + this._crawlerRef = null; + } +} diff --git a/src/Lifecycle.js b/src/Lifecycle.js index 7490c5d464..0b44f2ed84 100644 --- a/src/Lifecycle.js +++ b/src/Lifecycle.js @@ -20,6 +20,7 @@ import Promise from 'bluebird'; import Matrix from 'matrix-js-sdk'; import MatrixClientPeg from './MatrixClientPeg'; +import EventIndexPeg from './EventIndexPeg'; import createMatrixClient from './utils/createMatrixClient'; import Analytics from './Analytics'; import Notifier from './Notifier'; @@ -587,6 +588,7 @@ async function startMatrixClient(startSyncing=true) { if (startSyncing) { await MatrixClientPeg.start(); + await EventIndexPeg.init(); } else { console.warn("Caller requested only auxiliary services be started"); await MatrixClientPeg.assign(); diff --git a/src/MatrixClientPeg.js b/src/MatrixClientPeg.js index 5c5ee6e4ec..6c5b465bb0 100644 --- a/src/MatrixClientPeg.js +++ b/src/MatrixClientPeg.js @@ -31,6 +31,7 @@ import MatrixClientBackedSettingsHandler from "./settings/handlers/MatrixClientB import * as StorageManager from './utils/StorageManager'; import IdentityAuthClient from './IdentityAuthClient'; import PlatformPeg from "./PlatformPeg"; +import EventIndexPeg from "./EventIndexPeg"; interface MatrixClientCreds { homeserverUrl: string, @@ -223,9 +224,6 @@ class MatrixClientPeg { this.matrixClient = createMatrixClient(opts); - const platform = PlatformPeg.get(); - if (platform.supportsEventIndexing()) platform.initEventIndex(creds.userId); - // we're going to add eventlisteners for each matrix event tile, so the // potential number of event listeners is quite high. this.matrixClient.setMaxListeners(500); diff --git a/src/components/structures/MatrixChat.js b/src/components/structures/MatrixChat.js index 402790df98..d006247151 100644 --- a/src/components/structures/MatrixChat.js +++ b/src/components/structures/MatrixChat.js @@ -31,6 +31,7 @@ import Analytics from "../../Analytics"; import { DecryptionFailureTracker } from "../../DecryptionFailureTracker"; import MatrixClientPeg from "../../MatrixClientPeg"; import PlatformPeg from "../../PlatformPeg"; +import EventIndexPeg from "../../EventIndexPeg"; import SdkConfig from "../../SdkConfig"; import * as RoomListSorter from "../../RoomListSorter"; import dis from "../../dispatcher"; @@ -1224,12 +1225,6 @@ export default createReactClass({ _onLoggedOut: async function() { const platform = PlatformPeg.get(); - if (platform.supportsEventIndexing()) { - console.log("Seshat: Deleting event index."); - this.crawlerRef.cancel(); - await platform.deleteEventIndex(); - } - this.notifyNewScreen('login'); this.setStateForNewView({ view: VIEWS.LOGIN, @@ -1270,8 +1265,6 @@ export default createReactClass({ // to do the first sync this.firstSyncComplete = false; this.firstSyncPromise = Promise.defer(); - this.crawlerChekpoints = []; - this.liveEventsForIndex = new Set(); const cli = MatrixClientPeg.get(); const IncomingSasDialog = sdk.getComponent('views.dialogs.IncomingSasDialog'); @@ -1284,7 +1277,10 @@ export default createReactClass({ cli.setCanResetTimelineCallback(async function(roomId) { console.log("Request to reset timeline in room ", roomId, " viewing:", self.state.currentRoomId); // TODO is there a better place to plug this in - await self.addCheckpointForLimitedRoom(roomId); + const eventIndex = EventIndexPeg.get(); + if (eventIndex !== null) { + await eventIndex.addCheckpointForLimitedRoom(roomId); + } if (roomId !== self.state.currentRoomId) { // It is safe to remove events from rooms we are not viewing. @@ -1301,80 +1297,21 @@ export default createReactClass({ }); cli.on('sync', async (state, prevState, data) => { - const platform = PlatformPeg.get(); - if (!platform.supportsEventIndexing()) return; + const eventIndex = EventIndexPeg.get(); + if (eventIndex === null) return; + await eventIndex.onSync(state, prevState, data); + }); - if (prevState === null && state === "PREPARED") { - /// Load our stored checkpoints, if any. - self.crawlerChekpoints = await platform.loadCheckpoints(); - console.log("Seshat: Loaded checkpoints", - self.crawlerChekpoints); - return; - } + cli.on("Room.timeline", async (ev, room, toStartOfTimeline, removed, data) => { + const eventIndex = EventIndexPeg.get(); + if (eventIndex === null) return; + await eventIndex.onRoomTimeline(ev, room, toStartOfTimeline, removed, data); + }); - if (prevState === "PREPARED" && state === "SYNCING") { - const addInitialCheckpoints = async () => { - const client = MatrixClientPeg.get(); - const rooms = client.getRooms(); - - const isRoomEncrypted = (room) => { - return client.isRoomEncrypted(room.roomId); - }; - - // We only care to crawl the encrypted rooms, non-encrytped - // rooms can use the search provided by the Homeserver. - const encryptedRooms = rooms.filter(isRoomEncrypted); - - console.log("Seshat: Adding initial crawler checkpoints"); - - // Gather the prev_batch tokens and create checkpoints for - // our message crawler. - await Promise.all(encryptedRooms.map(async (room) => { - const timeline = room.getLiveTimeline(); - const token = timeline.getPaginationToken("b"); - - console.log("Seshat: Got token for indexer", - room.roomId, token); - - const backCheckpoint = { - roomId: room.roomId, - token: token, - direction: "b", - }; - - const forwardCheckpoint = { - roomId: room.roomId, - token: token, - direction: "f", - }; - - await platform.addCrawlerCheckpoint(backCheckpoint); - await platform.addCrawlerCheckpoint(forwardCheckpoint); - self.crawlerChekpoints.push(backCheckpoint); - self.crawlerChekpoints.push(forwardCheckpoint); - })); - }; - - // If our indexer is empty we're most likely running Riot the - // first time with indexing support or running it with an - // initial sync. Add checkpoints to crawl our encrypted rooms. - const eventIndexWasEmpty = await platform.isEventIndexEmpty(); - if (eventIndexWasEmpty) await addInitialCheckpoints(); - - // Start our crawler. - const crawlerHandle = {}; - self.crawlerFunc(crawlerHandle); - self.crawlerRef = crawlerHandle; - return; - } - - if (prevState === "SYNCING" && state === "SYNCING") { - // A sync was done, presumably we queued up some live events, - // commit them now. - console.log("Seshat: Committing events"); - await platform.commitLiveEvents(); - return; - } + cli.on("Event.decrypted", async (ev, err) => { + const eventIndex = EventIndexPeg.get(); + if (eventIndex === null) return; + await eventIndex.onEventDecrypted(ev, err); }); cli.on('sync', function(state, prevState, data) { @@ -1459,44 +1396,6 @@ export default createReactClass({ }, null, true); }); - cli.on("Room.timeline", async (ev, room, toStartOfTimeline, removed, data) => { - const platform = PlatformPeg.get(); - if (!platform.supportsEventIndexing()) return; - - // We only index encrypted rooms locally. - if (!MatrixClientPeg.get().isRoomEncrypted(room.roomId)) return; - - // If it isn't a live event or if it's redacted there's nothing to - // do. - if (toStartOfTimeline || !data || !data.liveEvent - || ev.isRedacted()) { - return; - } - - // If the event is not yet decrypted mark it for the - // Event.decrypted callback. - if (ev.isBeingDecrypted()) { - const eventId = ev.getId(); - self.liveEventsForIndex.add(eventId); - } else { - // If the event is decrypted or is unencrypted add it to the - // index now. - await self.addLiveEventToIndex(ev); - } - }); - - cli.on("Event.decrypted", async (ev, err) => { - const platform = PlatformPeg.get(); - if (!platform.supportsEventIndexing()) return; - - const eventId = ev.getId(); - - // If the event isn't in our live event set, ignore it. - if (!self.liveEventsForIndex.delete(eventId)) return; - if (err) return; - await self.addLiveEventToIndex(ev); - }); - cli.on("accountData", function(ev) { if (ev.getType() === 'im.vector.web.settings') { if (ev.getContent() && ev.getContent().theme) { @@ -2058,238 +1957,4 @@ export default createReactClass({ {view} ; }, - - async addLiveEventToIndex(ev) { - const platform = PlatformPeg.get(); - if (!platform.supportsEventIndexing()) return; - - if (["m.room.message", "m.room.name", "m.room.topic"] - .indexOf(ev.getType()) == -1) { - return; - } - - const e = ev.toJSON().decrypted; - const profile = { - displayname: ev.sender.rawDisplayName, - avatar_url: ev.sender.getMxcAvatarUrl(), - }; - - platform.addEventToIndex(e, profile); - }, - - async crawlerFunc(handle) { - // TODO either put this in a better place or find a library provided - // method that does this. - const sleep = async (ms) => { - return new Promise(resolve => setTimeout(resolve, ms)); - }; - - let cancelled = false; - - console.log("Seshat: Started crawler function"); - - const client = MatrixClientPeg.get(); - const platform = PlatformPeg.get(); - - handle.cancel = () => { - cancelled = true; - }; - - while (!cancelled) { - // This is a low priority task and we don't want to spam our - // Homeserver with /messages requests so we set a hefty 3s timeout - // here. - await sleep(3000); - - console.log("Seshat: Running the crawler loop."); - - if (cancelled) { - console.log("Seshat: Cancelling the crawler."); - break; - } - - const checkpoint = this.crawlerChekpoints.shift(); - - /// There is no checkpoint available currently, one may appear if - // a sync with limited room timelines happens, so go back to sleep. - if (checkpoint === undefined) { - continue; - } - - console.log("Seshat: crawling using checkpoint", checkpoint); - - // We have a checkpoint, let us fetch some messages, again, very - // conservatively to not bother our Homeserver too much. - const eventMapper = client.getEventMapper(); - // TODO we need to ensure to use member lazy loading with this - // request so we get the correct profiles. - let res; - - try { - res = await client._createMessagesRequest( - checkpoint.roomId, checkpoint.token, 100, - checkpoint.direction); - } catch (e) { - console.log("Seshat: Error crawling events:", e); - this.crawlerChekpoints.push(checkpoint); - continue - } - - if (res.chunk.length === 0) { - console.log("Seshat: Done with the checkpoint", checkpoint); - // We got to the start/end of our timeline, lets just - // delete our checkpoint and go back to sleep. - await platform.removeCrawlerCheckpoint(checkpoint); - continue; - } - - // Convert the plain JSON events into Matrix events so they get - // decrypted if necessary. - const matrixEvents = res.chunk.map(eventMapper); - let stateEvents = []; - if (res.state !== undefined) { - stateEvents = res.state.map(eventMapper); - } - - const profiles = {}; - - stateEvents.forEach(ev => { - if (ev.event.content && - ev.event.content.membership === "join") { - profiles[ev.event.sender] = { - displayname: ev.event.content.displayname, - avatar_url: ev.event.content.avatar_url, - }; - } - }); - - const decryptionPromises = []; - - matrixEvents.forEach(ev => { - if (ev.isBeingDecrypted() || ev.isDecryptionFailure()) { - // TODO the decryption promise is a private property, this - // should either be made public or we should convert the - // event that gets fired when decryption is done into a - // promise using the once event emitter method: - // https://nodejs.org/api/events.html#events_events_once_emitter_name - decryptionPromises.push(ev._decryptionPromise); - } - }); - - // Let us wait for all the events to get decrypted. - await Promise.all(decryptionPromises); - - // We filter out events for which decryption failed, are redacted - // or aren't of a type that we know how to index. - const isValidEvent = (value) => { - return ([ - "m.room.message", - "m.room.name", - "m.room.topic", - ].indexOf(value.getType()) >= 0 - && !value.isRedacted() && !value.isDecryptionFailure() - ); - // TODO do we need to check if the event has all the valid - // attributes? - }; - - // TODO if there ar no events at this point we're missing a lot - // decryption keys, do we wan't to retry this checkpoint at a later - // stage? - const filteredEvents = matrixEvents.filter(isValidEvent); - - // Let us convert the events back into a format that Seshat can - // consume. - const events = filteredEvents.map((ev) => { - const jsonEvent = ev.toJSON(); - - let e; - if (ev.isEncrypted()) e = jsonEvent.decrypted; - else e = jsonEvent; - - let profile = {}; - if (e.sender in profiles) profile = profiles[e.sender]; - const object = { - event: e, - profile: profile, - }; - return object; - }); - - // Create a new checkpoint so we can continue crawling the room for - // messages. - const newCheckpoint = { - roomId: checkpoint.roomId, - token: res.end, - fullCrawl: checkpoint.fullCrawl, - direction: checkpoint.direction, - }; - - console.log( - "Seshat: Crawled room", - client.getRoom(checkpoint.roomId).name, - "and fetched", events.length, "events.", - ); - - try { - const eventsAlreadyAdded = await platform.addHistoricEvents( - events, newCheckpoint, checkpoint); - // If all events were already indexed we assume that we catched - // up with our index and don't need to crawl the room further. - // Let us delete the checkpoint in that case, otherwise push - // the new checkpoint to be used by the crawler. - if (eventsAlreadyAdded === true && newCheckpoint.fullCrawl !== true) { - console.log("Seshat: Checkpoint had already all events", - "added, stopping the crawl", checkpoint); - await platform.removeCrawlerCheckpoint(newCheckpoint); - } else { - this.crawlerChekpoints.push(newCheckpoint); - } - } catch (e) { - console.log("Seshat: Error durring a crawl", e); - // An error occured, put the checkpoint back so we - // can retry. - this.crawlerChekpoints.push(checkpoint); - } - } - - console.log("Seshat: Stopping crawler function"); - }, - - async addCheckpointForLimitedRoom(roomId) { - const platform = PlatformPeg.get(); - if (!platform.supportsEventIndexing()) return; - if (!MatrixClientPeg.get().isRoomEncrypted(roomId)) return; - - const client = MatrixClientPeg.get(); - const room = client.getRoom(roomId); - - if (room === null) return; - - const timeline = room.getLiveTimeline(); - const token = timeline.getPaginationToken("b"); - - const backwardsCheckpoint = { - roomId: room.roomId, - token: token, - fullCrawl: false, - direction: "b", - }; - - const forwardsCheckpoint = { - roomId: room.roomId, - token: token, - fullCrawl: false, - direction: "f", - }; - - console.log("Seshat: Added checkpoint because of a limited timeline", - backwardsCheckpoint, forwardsCheckpoint); - - await platform.addCrawlerCheckpoint(backwardsCheckpoint); - await platform.addCrawlerCheckpoint(forwardsCheckpoint); - - this.crawlerChekpoints.push(backwardsCheckpoint); - this.crawlerChekpoints.push(forwardsCheckpoint); - }, });