[feature] Prune timelines once per hour to plug memory leak (#1117)

* export highest/lowest ULIDs as proper const

* add stop + start to timeline manager, other small fixes

* unexport unused interface funcs + tidy up

* add LastGot func

* add timeline Prune function

* test prune

* update lastGot
This commit is contained in:
tobi 2022-11-22 19:38:10 +01:00 committed by GitHub
parent 90bbcf1bcf
commit 50dc179d33
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 594 additions and 602 deletions

View file

@ -91,7 +91,7 @@ func (suite *NotificationTestSuite) TestGetNotificationsWithSpam() {
suite.spamNotifs()
testAccount := suite.testAccounts["local_account_1"]
before := time.Now()
notifications, err := suite.db.GetNotifications(context.Background(), testAccount.ID, []string{}, 20, "ZZZZZZZZZZZZZZZZZZZZZZZZZZ", "00000000000000000000000000")
notifications, err := suite.db.GetNotifications(context.Background(), testAccount.ID, []string{}, 20, id.Highest, id.Lowest)
suite.NoError(err)
timeTaken := time.Since(before)
fmt.Printf("\n\n\n withSpam: got %d notifications in %s\n\n\n", len(notifications), timeTaken)
@ -105,7 +105,7 @@ func (suite *NotificationTestSuite) TestGetNotificationsWithSpam() {
func (suite *NotificationTestSuite) TestGetNotificationsWithoutSpam() {
testAccount := suite.testAccounts["local_account_1"]
before := time.Now()
notifications, err := suite.db.GetNotifications(context.Background(), testAccount.ID, []string{}, 20, "ZZZZZZZZZZZZZZZZZZZZZZZZZZ", "00000000000000000000000000")
notifications, err := suite.db.GetNotifications(context.Background(), testAccount.ID, []string{}, 20, id.Highest, id.Lowest)
suite.NoError(err)
timeTaken := time.Since(before)
fmt.Printf("\n\n\n withoutSpam: got %d notifications in %s\n\n\n", len(notifications), timeTaken)
@ -122,7 +122,7 @@ func (suite *NotificationTestSuite) TestClearNotificationsWithSpam() {
err := suite.db.ClearNotifications(context.Background(), testAccount.ID)
suite.NoError(err)
notifications, err := suite.db.GetNotifications(context.Background(), testAccount.ID, []string{}, 20, "ZZZZZZZZZZZZZZZZZZZZZZZZZZ", "00000000000000000000000000")
notifications, err := suite.db.GetNotifications(context.Background(), testAccount.ID, []string{}, 20, id.Highest, id.Lowest)
suite.NoError(err)
suite.NotNil(notifications)
suite.Empty(notifications)
@ -134,7 +134,7 @@ func (suite *NotificationTestSuite) TestClearNotificationsWithTwoAccounts() {
err := suite.db.ClearNotifications(context.Background(), testAccount.ID)
suite.NoError(err)
notifications, err := suite.db.GetNotifications(context.Background(), testAccount.ID, []string{}, 20, "ZZZZZZZZZZZZZZZZZZZZZZZZZZ", "00000000000000000000000000")
notifications, err := suite.db.GetNotifications(context.Background(), testAccount.ID, []string{}, 20, id.Highest, id.Lowest)
suite.NoError(err)
suite.NotNil(notifications)
suite.Empty(notifications)

View file

@ -1,3 +1,21 @@
/*
GoToSocial
Copyright (C) 2021-2022 GoToSocial Authors admin@gotosocial.org
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package id
import (
@ -8,7 +26,11 @@ import (
"github.com/oklog/ulid"
)
const randomRange = 631152381 // ~20 years in seconds
const (
Highest = "ZZZZZZZZZZZZZZZZZZZZZZZZZZ" // Highest is the highest possible ULID
Lowest = "00000000000000000000000000" // Lowest is the lowest possible ULID
randomRange = 631152381 // ~20 years in seconds
)
// ULID represents a Universally Unique Lexicographically Sortable Identifier of 26 characters. See https://github.com/oklog/ulid
type ULID string

View file

@ -351,6 +351,11 @@ func (p *processor) Start() error {
return err
}
// Start status timelines
if err := p.statusTimelines.Start(); err != nil {
return err
}
return nil
}
@ -359,8 +364,14 @@ func (p *processor) Stop() error {
if err := p.clientWorker.Stop(); err != nil {
return err
}
if err := p.fedWorker.Stop(); err != nil {
return err
}
if err := p.statusTimelines.Stop(); err != nil {
return err
}
return nil
}

View file

@ -23,6 +23,7 @@ import (
"context"
"errors"
"fmt"
"time"
"codeberg.org/gruf/go-kv"
"github.com/superseriousbusiness/gotosocial/internal/log"
@ -30,16 +31,27 @@ import (
const retries = 5
func (t *timeline) LastGot() time.Time {
t.Lock()
defer t.Unlock()
return t.lastGot
}
func (t *timeline) Get(ctx context.Context, amount int, maxID string, sinceID string, minID string, prepareNext bool) ([]Preparable, error) {
l := log.WithFields(kv.Fields{
{"accountID", t.accountID},
{"amount", amount},
{"maxID", maxID},
{"sinceID", sinceID},
{"minID", minID},
}...)
l.Debug("entering get")
l.Debug("entering get and updating t.lastGot")
// regardless of what happens below, update the
// last time Get was called for this timeline
t.Lock()
t.lastGot = time.Now()
t.Unlock()
var items []Preparable
var err error
@ -47,7 +59,7 @@ func (t *timeline) Get(ctx context.Context, amount int, maxID string, sinceID st
// no params are defined to just fetch from the top
// this is equivalent to a user asking for the top x items from their timeline
if maxID == "" && sinceID == "" && minID == "" {
items, err = t.GetXFromTop(ctx, amount)
items, err = t.getXFromTop(ctx, amount)
// aysnchronously prepare the next predicted query so it's ready when the user asks for it
if len(items) != 0 {
nextMaxID := items[len(items)-1].GetID()
@ -67,7 +79,7 @@ func (t *timeline) Get(ctx context.Context, amount int, maxID string, sinceID st
// this is equivalent to a user asking for the next x items from their timeline, starting from maxID
if maxID != "" && sinceID == "" {
attempts := 0
items, err = t.GetXBehindID(ctx, amount, maxID, &attempts)
items, err = t.getXBehindID(ctx, amount, maxID, &attempts)
// aysnchronously prepare the next predicted query so it's ready when the user asks for it
if len(items) != 0 {
nextMaxID := items[len(items)-1].GetID()
@ -86,25 +98,26 @@ func (t *timeline) Get(ctx context.Context, amount int, maxID string, sinceID st
// maxID is defined and sinceID || minID are as well, so take a slice between them
// this is equivalent to a user asking for items older than x but newer than y
if maxID != "" && sinceID != "" {
items, err = t.GetXBetweenID(ctx, amount, maxID, minID)
items, err = t.getXBetweenID(ctx, amount, maxID, minID)
}
if maxID != "" && minID != "" {
items, err = t.GetXBetweenID(ctx, amount, maxID, minID)
items, err = t.getXBetweenID(ctx, amount, maxID, minID)
}
// maxID isn't defined, but sinceID || minID are, so take x before
// this is equivalent to a user asking for items newer than x (eg., refreshing the top of their timeline)
if maxID == "" && sinceID != "" {
items, err = t.GetXBeforeID(ctx, amount, sinceID, true)
items, err = t.getXBeforeID(ctx, amount, sinceID, true)
}
if maxID == "" && minID != "" {
items, err = t.GetXBeforeID(ctx, amount, minID, true)
items, err = t.getXBeforeID(ctx, amount, minID, true)
}
return items, err
}
func (t *timeline) GetXFromTop(ctx context.Context, amount int) ([]Preparable, error) {
// getXFromTop returns x amount of items from the top of the timeline, from newest to oldest.
func (t *timeline) getXFromTop(ctx context.Context, amount int) ([]Preparable, error) {
// make a slice of preparedItems with the length we need to return
preparedItems := make([]Preparable, 0, amount)
@ -124,7 +137,7 @@ func (t *timeline) GetXFromTop(ctx context.Context, amount int) ([]Preparable, e
for e := t.preparedItems.data.Front(); e != nil; e = e.Next() {
entry, ok := e.Value.(*preparedItemsEntry)
if !ok {
return nil, errors.New("GetXFromTop: could not parse e as a preparedItemsEntry")
return nil, errors.New("getXFromTop: could not parse e as a preparedItemsEntry")
}
preparedItems = append(preparedItems, entry.prepared)
served++
@ -136,9 +149,12 @@ func (t *timeline) GetXFromTop(ctx context.Context, amount int) ([]Preparable, e
return preparedItems, nil
}
func (t *timeline) GetXBehindID(ctx context.Context, amount int, behindID string, attempts *int) ([]Preparable, error) {
// getXBehindID returns x amount of items from the given id onwards, from newest to oldest.
// This will NOT include the item with the given ID.
//
// This corresponds to an api call to /timelines/home?max_id=WHATEVER
func (t *timeline) getXBehindID(ctx context.Context, amount int, behindID string, attempts *int) ([]Preparable, error) {
l := log.WithFields(kv.Fields{
{"amount", amount},
{"behindID", behindID},
{"attempts", attempts},
@ -164,7 +180,7 @@ findMarkLoop:
position++
entry, ok := e.Value.(*preparedItemsEntry)
if !ok {
return nil, errors.New("GetXBehindID: could not parse e as a preparedPostsEntry")
return nil, errors.New("getXBehindID: could not parse e as a preparedPostsEntry")
}
if entry.itemID <= behindID {
@ -177,10 +193,10 @@ findMarkLoop:
// we didn't find it, so we need to make sure it's indexed and prepared and then try again
// this can happen when a user asks for really old items
if behindIDMark == nil {
if err := t.PrepareBehind(ctx, behindID, amount); err != nil {
return nil, fmt.Errorf("GetXBehindID: error preparing behind and including ID %s", behindID)
if err := t.prepareBehind(ctx, behindID, amount); err != nil {
return nil, fmt.Errorf("getXBehindID: error preparing behind and including ID %s", behindID)
}
oldestID, err := t.OldestPreparedItemID(ctx)
oldestID, err := t.oldestPreparedItemID(ctx)
if err != nil {
return nil, err
}
@ -196,13 +212,13 @@ findMarkLoop:
l.Tracef("exceeded retries looking for behindID %s", behindID)
return items, nil
}
l.Trace("trying GetXBehindID again")
return t.GetXBehindID(ctx, amount, behindID, attempts)
l.Trace("trying getXBehindID again")
return t.getXBehindID(ctx, amount, behindID, attempts)
}
// make sure we have enough items prepared behind it to return what we're being asked for
if t.preparedItems.data.Len() < amount+position {
if err := t.PrepareBehind(ctx, behindID, amount); err != nil {
if err := t.prepareBehind(ctx, behindID, amount); err != nil {
return nil, err
}
}
@ -213,7 +229,7 @@ serveloop:
for e := behindIDMark.Next(); e != nil; e = e.Next() {
entry, ok := e.Value.(*preparedItemsEntry)
if !ok {
return nil, errors.New("GetXBehindID: could not parse e as a preparedPostsEntry")
return nil, errors.New("getXBehindID: could not parse e as a preparedPostsEntry")
}
// serve up to the amount requested
@ -227,7 +243,11 @@ serveloop:
return items, nil
}
func (t *timeline) GetXBeforeID(ctx context.Context, amount int, beforeID string, startFromTop bool) ([]Preparable, error) {
// getXBeforeID returns x amount of items up to the given id, from newest to oldest.
// This will NOT include the item with the given ID.
//
// This corresponds to an api call to /timelines/home?since_id=WHATEVER
func (t *timeline) getXBeforeID(ctx context.Context, amount int, beforeID string, startFromTop bool) ([]Preparable, error) {
// make a slice of items with the length we need to return
items := make([]Preparable, 0, amount)
@ -241,7 +261,7 @@ findMarkLoop:
for e := t.preparedItems.data.Front(); e != nil; e = e.Next() {
entry, ok := e.Value.(*preparedItemsEntry)
if !ok {
return nil, errors.New("GetXBeforeID: could not parse e as a preparedPostsEntry")
return nil, errors.New("getXBeforeID: could not parse e as a preparedPostsEntry")
}
if entry.itemID >= beforeID {
@ -263,7 +283,7 @@ findMarkLoop:
for e := t.preparedItems.data.Front(); e != nil; e = e.Next() {
entry, ok := e.Value.(*preparedItemsEntry)
if !ok {
return nil, errors.New("GetXBeforeID: could not parse e as a preparedPostsEntry")
return nil, errors.New("getXBeforeID: could not parse e as a preparedPostsEntry")
}
if entry.itemID == beforeID {
@ -283,7 +303,7 @@ findMarkLoop:
for e := beforeIDMark.Prev(); e != nil; e = e.Prev() {
entry, ok := e.Value.(*preparedItemsEntry)
if !ok {
return nil, errors.New("GetXBeforeID: could not parse e as a preparedPostsEntry")
return nil, errors.New("getXBeforeID: could not parse e as a preparedPostsEntry")
}
// serve up to the amount requested
@ -298,7 +318,11 @@ findMarkLoop:
return items, nil
}
func (t *timeline) GetXBetweenID(ctx context.Context, amount int, behindID string, beforeID string) ([]Preparable, error) {
// getXBetweenID returns x amount of items from the given maxID, up to the given id, from newest to oldest.
// This will NOT include the item with the given IDs.
//
// This corresponds to an api call to /timelines/home?since_id=WHATEVER&max_id=WHATEVER_ELSE
func (t *timeline) getXBetweenID(ctx context.Context, amount int, behindID string, beforeID string) ([]Preparable, error) {
// make a slice of items with the length we need to return
items := make([]Preparable, 0, amount)
@ -314,7 +338,7 @@ findMarkLoop:
position++
entry, ok := e.Value.(*preparedItemsEntry)
if !ok {
return nil, errors.New("GetXBetweenID: could not parse e as a preparedPostsEntry")
return nil, errors.New("getXBetweenID: could not parse e as a preparedPostsEntry")
}
if entry.itemID == behindID {
@ -325,12 +349,12 @@ findMarkLoop:
// we didn't find it
if behindIDMark == nil {
return nil, fmt.Errorf("GetXBetweenID: couldn't find item with ID %s", behindID)
return nil, fmt.Errorf("getXBetweenID: couldn't find item with ID %s", behindID)
}
// make sure we have enough items prepared behind it to return what we're being asked for
if t.preparedItems.data.Len() < amount+position {
if err := t.PrepareBehind(ctx, behindID, amount); err != nil {
if err := t.prepareBehind(ctx, behindID, amount); err != nil {
return nil, err
}
}
@ -341,7 +365,7 @@ serveloop:
for e := behindIDMark.Next(); e != nil; e = e.Next() {
entry, ok := e.Value.(*preparedItemsEntry)
if !ok {
return nil, errors.New("GetXBetweenID: could not parse e as a preparedPostsEntry")
return nil, errors.New("getXBetweenID: could not parse e as a preparedPostsEntry")
}
if entry.itemID == beforeID {

View file

@ -89,6 +89,9 @@ func (suite *GetTestSuite) TearDownTest() {
}
func (suite *GetTestSuite) TestGetDefault() {
// lastGot should be zero
suite.Zero(suite.timeline.LastGot())
// get 10 20 the top and don't prepare the next query
statuses, err := suite.timeline.Get(context.Background(), 20, "", "", "", false)
if err != nil {
@ -108,6 +111,9 @@ func (suite *GetTestSuite) TestGetDefault() {
highest = s.GetID()
}
}
// lastGot should be up to date
suite.WithinDuration(time.Now(), suite.timeline.LastGot(), 1*time.Second)
}
func (suite *GetTestSuite) TestGetDefaultPrepareNext() {
@ -297,165 +303,6 @@ func (suite *GetTestSuite) TestGetBetweenIDPrepareNext() {
time.Sleep(1 * time.Second)
}
func (suite *GetTestSuite) TestGetXFromTop() {
// get 5 from the top
statuses, err := suite.timeline.GetXFromTop(context.Background(), 5)
if err != nil {
suite.FailNow(err.Error())
}
suite.Len(statuses, 5)
// statuses should be sorted highest to lowest ID
var highest string
for i, s := range statuses {
if i == 0 {
highest = s.GetID()
} else {
suite.Less(s.GetID(), highest)
highest = s.GetID()
}
}
}
func (suite *GetTestSuite) TestGetXBehindID() {
// get 3 behind the 'middle' id
var attempts *int
a := 0
attempts = &a
statuses, err := suite.timeline.GetXBehindID(context.Background(), 3, "01F8MHBQCBTDKN6X5VHGMMN4MA", attempts)
if err != nil {
suite.FailNow(err.Error())
}
suite.Len(statuses, 3)
// statuses should be sorted highest to lowest ID
// all status IDs should be less than the behindID
var highest string
for i, s := range statuses {
if i == 0 {
highest = s.GetID()
} else {
suite.Less(s.GetID(), highest)
highest = s.GetID()
}
suite.Less(s.GetID(), "01F8MHBQCBTDKN6X5VHGMMN4MA")
}
}
func (suite *GetTestSuite) TestGetXBehindID0() {
// try to get behind 0, the lowest possible ID
var attempts *int
a := 0
attempts = &a
statuses, err := suite.timeline.GetXBehindID(context.Background(), 3, "0", attempts)
if err != nil {
suite.FailNow(err.Error())
}
// there's nothing beyond it so len should be 0
suite.Len(statuses, 0)
}
func (suite *GetTestSuite) TestGetXBehindNonexistentReasonableID() {
// try to get behind an id that doesn't exist, but is close to one that does so we should still get statuses back
var attempts *int
a := 0
attempts = &a
statuses, err := suite.timeline.GetXBehindID(context.Background(), 3, "01F8MHBQCBTDKN6X5VHGMMN4MB", attempts) // change the last A to a B
if err != nil {
suite.FailNow(err.Error())
}
suite.Len(statuses, 3)
// statuses should be sorted highest to lowest ID
// all status IDs should be less than the behindID
var highest string
for i, s := range statuses {
if i == 0 {
highest = s.GetID()
} else {
suite.Less(s.GetID(), highest)
highest = s.GetID()
}
suite.Less(s.GetID(), "01F8MHBCN8120SYH7D5S050MGK")
}
}
func (suite *GetTestSuite) TestGetXBehindVeryHighID() {
// try to get behind an id that doesn't exist, and is higher than any other ID we could possibly have
var attempts *int
a := 0
attempts = &a
statuses, err := suite.timeline.GetXBehindID(context.Background(), 7, "9998MHBQCBTDKN6X5VHGMMN4MA", attempts)
if err != nil {
suite.FailNow(err.Error())
}
// we should get all 7 statuses we asked for because they all have lower IDs than the very high ID given in the query
suite.Len(statuses, 7)
// statuses should be sorted highest to lowest ID
// all status IDs should be less than the behindID
var highest string
for i, s := range statuses {
if i == 0 {
highest = s.GetID()
} else {
suite.Less(s.GetID(), highest)
highest = s.GetID()
}
suite.Less(s.GetID(), "9998MHBQCBTDKN6X5VHGMMN4MA")
}
}
func (suite *GetTestSuite) TestGetXBeforeID() {
// get 3 before the 'middle' id
statuses, err := suite.timeline.GetXBeforeID(context.Background(), 3, "01F8MHBQCBTDKN6X5VHGMMN4MA", true)
if err != nil {
suite.FailNow(err.Error())
}
suite.Len(statuses, 3)
// statuses should be sorted highest to lowest ID
// all status IDs should be greater than the beforeID
var highest string
for i, s := range statuses {
if i == 0 {
highest = s.GetID()
} else {
suite.Less(s.GetID(), highest)
highest = s.GetID()
}
suite.Greater(s.GetID(), "01F8MHBQCBTDKN6X5VHGMMN4MA")
}
}
func (suite *GetTestSuite) TestGetXBeforeIDNoStartFromTop() {
// get 3 before the 'middle' id
statuses, err := suite.timeline.GetXBeforeID(context.Background(), 3, "01F8MHBQCBTDKN6X5VHGMMN4MA", false)
if err != nil {
suite.FailNow(err.Error())
}
suite.Len(statuses, 3)
// statuses should be sorted lowest to highest ID
// all status IDs should be greater than the beforeID
var lowest string
for i, s := range statuses {
if i == 0 {
lowest = s.GetID()
} else {
suite.Greater(s.GetID(), lowest)
lowest = s.GetID()
}
suite.Greater(s.GetID(), "01F8MHBQCBTDKN6X5VHGMMN4MA")
}
}
func TestGetTestSuite(t *testing.T) {
suite.Run(t, new(GetTestSuite))
}

View file

@ -28,79 +28,80 @@ import (
"github.com/superseriousbusiness/gotosocial/internal/log"
)
func (t *timeline) IndexBefore(ctx context.Context, itemID string, amount int) error {
l := log.WithFields(kv.Fields{
{"amount", amount},
}...)
// lazily initialize index if it hasn't been done already
if t.itemIndex.data == nil {
t.itemIndex.data = &list.List{}
t.itemIndex.data.Init()
func (t *timeline) ItemIndexLength(ctx context.Context) int {
if t.indexedItems == nil || t.indexedItems.data == nil {
return 0
}
toIndex := []Timelineable{}
offsetID := itemID
l.Trace("entering grabloop")
grabloop:
for i := 0; len(toIndex) < amount && i < 5; i++ { // try the grabloop 5 times only
// first grab items using the caller-provided grab function
l.Trace("grabbing...")
items, stop, err := t.grabFunction(ctx, t.accountID, "", "", offsetID, amount)
if err != nil {
return err
}
if stop {
break grabloop
}
l.Trace("filtering...")
// now filter each item using the caller-provided filter function
for _, item := range items {
shouldIndex, err := t.filterFunction(ctx, t.accountID, item)
if err != nil {
return err
}
if shouldIndex {
toIndex = append(toIndex, item)
}
offsetID = item.GetID()
}
}
l.Trace("left grabloop")
// index the items we got
for _, s := range toIndex {
if _, err := t.IndexOne(ctx, s.GetID(), s.GetBoostOfID(), s.GetAccountID(), s.GetBoostOfAccountID()); err != nil {
return fmt.Errorf("IndexBehind: error indexing item with id %s: %s", s.GetID(), err)
}
}
return nil
return t.indexedItems.data.Len()
}
func (t *timeline) IndexBehind(ctx context.Context, itemID string, amount int) error {
l := log.WithFields(kv.Fields{
// func (t *timeline) indexBefore(ctx context.Context, itemID string, amount int) error {
// l := log.WithFields(kv.Fields{{"amount", amount}}...)
{"amount", amount},
}...)
// // lazily initialize index if it hasn't been done already
// if t.indexedItems.data == nil {
// t.indexedItems.data = &list.List{}
// t.indexedItems.data.Init()
// }
// toIndex := []Timelineable{}
// offsetID := itemID
// l.Trace("entering grabloop")
// grabloop:
// for i := 0; len(toIndex) < amount && i < 5; i++ { // try the grabloop 5 times only
// // first grab items using the caller-provided grab function
// l.Trace("grabbing...")
// items, stop, err := t.grabFunction(ctx, t.accountID, "", "", offsetID, amount)
// if err != nil {
// return err
// }
// if stop {
// break grabloop
// }
// l.Trace("filtering...")
// // now filter each item using the caller-provided filter function
// for _, item := range items {
// shouldIndex, err := t.filterFunction(ctx, t.accountID, item)
// if err != nil {
// return err
// }
// if shouldIndex {
// toIndex = append(toIndex, item)
// }
// offsetID = item.GetID()
// }
// }
// l.Trace("left grabloop")
// // index the items we got
// for _, s := range toIndex {
// if _, err := t.IndexOne(ctx, s.GetID(), s.GetBoostOfID(), s.GetAccountID(), s.GetBoostOfAccountID()); err != nil {
// return fmt.Errorf("indexBefore: error indexing item with id %s: %s", s.GetID(), err)
// }
// }
// return nil
// }
func (t *timeline) indexBehind(ctx context.Context, itemID string, amount int) error {
l := log.WithFields(kv.Fields{{"amount", amount}}...)
// lazily initialize index if it hasn't been done already
if t.itemIndex.data == nil {
t.itemIndex.data = &list.List{}
t.itemIndex.data.Init()
if t.indexedItems.data == nil {
t.indexedItems.data = &list.List{}
t.indexedItems.data.Init()
}
// If we're already indexedBehind given itemID by the required amount, we can return nil.
// First find position of itemID (or as near as possible).
var position int
positionLoop:
for e := t.itemIndex.data.Front(); e != nil; e = e.Next() {
entry, ok := e.Value.(*itemIndexEntry)
for e := t.indexedItems.data.Front(); e != nil; e = e.Next() {
entry, ok := e.Value.(*indexedItemsEntry)
if !ok {
return errors.New("IndexBehind: could not parse e as an itemIndexEntry")
return errors.New("indexBehind: could not parse e as an itemIndexEntry")
}
if entry.itemID <= itemID {
@ -111,7 +112,7 @@ positionLoop:
}
// now check if the length of indexed items exceeds the amount of items required (position of itemID, plus amount of posts requested after that)
if t.itemIndex.data.Len() > position+amount {
if t.indexedItems.data.Len() > position+amount {
// we have enough indexed behind already to satisfy amount, so don't need to make db calls
l.Trace("returning nil since we already have enough items indexed")
return nil
@ -151,7 +152,7 @@ grabloop:
// index the items we got
for _, s := range toIndex {
if _, err := t.IndexOne(ctx, s.GetID(), s.GetBoostOfID(), s.GetAccountID(), s.GetBoostOfAccountID()); err != nil {
return fmt.Errorf("IndexBehind: error indexing item with id %s: %s", s.GetID(), err)
return fmt.Errorf("indexBehind: error indexing item with id %s: %s", s.GetID(), err)
}
}
@ -162,28 +163,28 @@ func (t *timeline) IndexOne(ctx context.Context, itemID string, boostOfID string
t.Lock()
defer t.Unlock()
postIndexEntry := &itemIndexEntry{
postIndexEntry := &indexedItemsEntry{
itemID: itemID,
boostOfID: boostOfID,
accountID: accountID,
boostOfAccountID: boostOfAccountID,
}
return t.itemIndex.insertIndexed(ctx, postIndexEntry)
return t.indexedItems.insertIndexed(ctx, postIndexEntry)
}
func (t *timeline) IndexAndPrepareOne(ctx context.Context, statusID string, boostOfID string, accountID string, boostOfAccountID string) (bool, error) {
t.Lock()
defer t.Unlock()
postIndexEntry := &itemIndexEntry{
postIndexEntry := &indexedItemsEntry{
itemID: statusID,
boostOfID: boostOfID,
accountID: accountID,
boostOfAccountID: boostOfAccountID,
}
inserted, err := t.itemIndex.insertIndexed(ctx, postIndexEntry)
inserted, err := t.indexedItems.insertIndexed(ctx, postIndexEntry)
if err != nil {
return inserted, fmt.Errorf("IndexAndPrepareOne: error inserting indexed: %s", err)
}
@ -199,13 +200,13 @@ func (t *timeline) IndexAndPrepareOne(ctx context.Context, statusID string, boos
func (t *timeline) OldestIndexedItemID(ctx context.Context) (string, error) {
var id string
if t.itemIndex == nil || t.itemIndex.data == nil || t.itemIndex.data.Back() == nil {
if t.indexedItems == nil || t.indexedItems.data == nil || t.indexedItems.data.Back() == nil {
// return an empty string if postindex hasn't been initialized yet
return id, nil
}
e := t.itemIndex.data.Back()
entry, ok := e.Value.(*itemIndexEntry)
e := t.indexedItems.data.Back()
entry, ok := e.Value.(*indexedItemsEntry)
if !ok {
return id, errors.New("OldestIndexedItemID: could not parse e as itemIndexEntry")
}
@ -214,13 +215,13 @@ func (t *timeline) OldestIndexedItemID(ctx context.Context) (string, error) {
func (t *timeline) NewestIndexedItemID(ctx context.Context) (string, error) {
var id string
if t.itemIndex == nil || t.itemIndex.data == nil || t.itemIndex.data.Front() == nil {
if t.indexedItems == nil || t.indexedItems.data == nil || t.indexedItems.data.Front() == nil {
// return an empty string if postindex hasn't been initialized yet
return id, nil
}
e := t.itemIndex.data.Front()
entry, ok := e.Value.(*itemIndexEntry)
e := t.indexedItems.data.Front()
entry, ok := e.Value.(*indexedItemsEntry)
if !ok {
return id, errors.New("NewestIndexedItemID: could not parse e as itemIndexEntry")
}

View file

@ -69,63 +69,6 @@ func (suite *IndexTestSuite) TearDownTest() {
testrig.StandardDBTeardown(suite.db)
}
func (suite *IndexTestSuite) TestIndexBeforeLowID() {
// index 10 before the lowest status ID possible
err := suite.timeline.IndexBefore(context.Background(), "00000000000000000000000000", 10)
suite.NoError(err)
postID, err := suite.timeline.OldestIndexedItemID(context.Background())
suite.NoError(err)
suite.Equal("01F8MHBQCBTDKN6X5VHGMMN4MA", postID)
indexLength := suite.timeline.ItemIndexLength(context.Background())
suite.Equal(10, indexLength)
}
func (suite *IndexTestSuite) TestIndexBeforeHighID() {
// index 10 before the highest status ID possible
err := suite.timeline.IndexBefore(context.Background(), "ZZZZZZZZZZZZZZZZZZZZZZZZZZ", 10)
suite.NoError(err)
// the oldest indexed post should be empty
postID, err := suite.timeline.OldestIndexedItemID(context.Background())
suite.NoError(err)
suite.Empty(postID)
// indexLength should be 0
indexLength := suite.timeline.ItemIndexLength(context.Background())
suite.Equal(0, indexLength)
}
func (suite *IndexTestSuite) TestIndexBehindHighID() {
// index 10 behind the highest status ID possible
err := suite.timeline.IndexBehind(context.Background(), "ZZZZZZZZZZZZZZZZZZZZZZZZZZ", 10)
suite.NoError(err)
// the newest indexed post should be the highest one we have in our testrig
postID, err := suite.timeline.NewestIndexedItemID(context.Background())
suite.NoError(err)
suite.Equal("01G36SF3V6Y6V5BF9P4R7PQG7G", postID)
indexLength := suite.timeline.ItemIndexLength(context.Background())
suite.Equal(10, indexLength)
}
func (suite *IndexTestSuite) TestIndexBehindLowID() {
// index 10 behind the lowest status ID possible
err := suite.timeline.IndexBehind(context.Background(), "00000000000000000000000000", 10)
suite.NoError(err)
// the newest indexed post should be empty
postID, err := suite.timeline.NewestIndexedItemID(context.Background())
suite.NoError(err)
suite.Empty(postID)
// indexLength should be 0
indexLength := suite.timeline.ItemIndexLength(context.Background())
suite.Equal(0, indexLength)
}
func (suite *IndexTestSuite) TestOldestIndexedItemIDEmpty() {
// the oldest indexed post should be an empty string since there's nothing indexed yet
postID, err := suite.timeline.OldestIndexedItemID(context.Background())
@ -137,17 +80,6 @@ func (suite *IndexTestSuite) TestOldestIndexedItemIDEmpty() {
suite.Equal(0, indexLength)
}
func (suite *IndexTestSuite) TestNewestIndexedItemIDEmpty() {
// the newest indexed post should be an empty string since there's nothing indexed yet
postID, err := suite.timeline.NewestIndexedItemID(context.Background())
suite.NoError(err)
suite.Empty(postID)
// indexLength should be 0
indexLength := suite.timeline.ItemIndexLength(context.Background())
suite.Equal(0, indexLength)
}
func (suite *IndexTestSuite) TestIndexAlreadyIndexed() {
testStatus := suite.testStatuses["local_account_1_status_1"]

View file

@ -24,26 +24,26 @@ import (
"errors"
)
type itemIndex struct {
type indexedItems struct {
data *list.List
skipInsert SkipInsertFunction
}
type itemIndexEntry struct {
type indexedItemsEntry struct {
itemID string
boostOfID string
accountID string
boostOfAccountID string
}
func (p *itemIndex) insertIndexed(ctx context.Context, i *itemIndexEntry) (bool, error) {
if p.data == nil {
p.data = &list.List{}
func (i *indexedItems) insertIndexed(ctx context.Context, newEntry *indexedItemsEntry) (bool, error) {
if i.data == nil {
i.data = &list.List{}
}
// if we have no entries yet, this is both the newest and oldest entry, so just put it in the front
if p.data.Len() == 0 {
p.data.PushFront(i)
if i.data.Len() == 0 {
i.data.PushFront(newEntry)
return true, nil
}
@ -51,15 +51,15 @@ func (p *itemIndex) insertIndexed(ctx context.Context, i *itemIndexEntry) (bool,
var position int
// We need to iterate through the index to make sure we put this item in the appropriate place according to when it was created.
// We also need to make sure we're not inserting a duplicate item -- this can happen sometimes and it's not nice UX (*shudder*).
for e := p.data.Front(); e != nil; e = e.Next() {
for e := i.data.Front(); e != nil; e = e.Next() {
position++
entry, ok := e.Value.(*itemIndexEntry)
entry, ok := e.Value.(*indexedItemsEntry)
if !ok {
return false, errors.New("index: could not parse e as an itemIndexEntry")
return false, errors.New("insertIndexed: could not parse e as an indexedItemsEntry")
}
skip, err := p.skipInsert(ctx, i.itemID, i.accountID, i.boostOfID, i.boostOfAccountID, entry.itemID, entry.accountID, entry.boostOfID, entry.boostOfAccountID, position)
skip, err := i.skipInsert(ctx, newEntry.itemID, newEntry.accountID, newEntry.boostOfID, newEntry.boostOfAccountID, entry.itemID, entry.accountID, entry.boostOfID, entry.boostOfAccountID, position)
if err != nil {
return false, err
}
@ -69,18 +69,18 @@ func (p *itemIndex) insertIndexed(ctx context.Context, i *itemIndexEntry) (bool,
// if the item to index is newer than e, insert it before e in the list
if insertMark == nil {
if i.itemID > entry.itemID {
if newEntry.itemID > entry.itemID {
insertMark = e
}
}
}
if insertMark != nil {
p.data.InsertBefore(i, insertMark)
i.data.InsertBefore(newEntry, insertMark)
return true, nil
}
// if we reach this point it's the oldest item we've seen so put it at the back
p.data.PushBack(i)
i.data.PushBack(newEntry)
return true, nil
}

View file

@ -23,15 +23,12 @@ import (
"fmt"
"strings"
"sync"
"time"
"codeberg.org/gruf/go-kv"
"github.com/superseriousbusiness/gotosocial/internal/log"
)
const (
desiredPostIndexLength = 400
)
// Manager abstracts functions for creating timelines for multiple accounts, and adding, removing, and fetching entries from those timelines.
//
// By the time a timelineable hits the manager interface, it should already have been filtered and it should be established that the item indeed
@ -65,8 +62,6 @@ type Manager interface {
GetTimeline(ctx context.Context, accountID string, maxID string, sinceID string, minID string, limit int, local bool) ([]Preparable, error)
// GetIndexedLength returns the amount of items that have been *indexed* for the given account ID.
GetIndexedLength(ctx context.Context, timelineAccountID string) int
// GetDesiredIndexLength returns the amount of items that we, ideally, index for each user.
GetDesiredIndexLength(ctx context.Context) int
// GetOldestIndexedID returns the id ID for the oldest item that we have indexed for the given account.
GetOldestIndexedID(ctx context.Context, timelineAccountID string) (string, error)
// PrepareXFromTop prepares limit n amount of items, based on their indexed representations, from the top of the index.
@ -77,6 +72,10 @@ type Manager interface {
WipeItemFromAllTimelines(ctx context.Context, itemID string) error
// WipeStatusesFromAccountID removes all items by the given accountID from the timelineAccountID's timelines.
WipeItemsFromAccountID(ctx context.Context, timelineAccountID string, accountID string) error
// Start starts hourly cleanup jobs for this timeline manager.
Start() error
// Stop stops the timeline manager (currently a stub, doesn't do anything).
Stop() error
}
// NewManager returns a new timeline manager.
@ -98,9 +97,44 @@ type manager struct {
skipInsertFunction SkipInsertFunction
}
func (m *manager) Start() error {
// range through all timelines in the sync map once per hour + prune as necessary
go func() {
for now := range time.NewTicker(1 * time.Hour).C {
m.accountTimelines.Range(func(key any, value any) bool {
timelineAccountID, ok := key.(string)
if !ok {
panic("couldn't parse timeline manager sync map key as string, this should never happen so panic")
}
t, ok := value.(Timeline)
if !ok {
panic("couldn't parse timeline manager sync map value as Timeline, this should never happen so panic")
}
anHourAgo := now.Add(-1 * time.Hour)
if lastGot := t.LastGot(); lastGot.Before(anHourAgo) {
amountPruned := t.Prune(defaultDesiredPreparedItemsLength, defaultDesiredIndexedItemsLength)
log.WithFields(kv.Fields{
{"timelineAccountID", timelineAccountID},
{"amountPruned", amountPruned},
}...).Info("pruned indexed and prepared items from timeline")
}
return true
})
}
}()
return nil
}
func (m *manager) Stop() error {
return nil
}
func (m *manager) Ingest(ctx context.Context, item Timelineable, timelineAccountID string) (bool, error) {
l := log.WithFields(kv.Fields{
{"timelineAccountID", timelineAccountID},
{"itemID", item.GetID()},
}...)
@ -116,7 +150,6 @@ func (m *manager) Ingest(ctx context.Context, item Timelineable, timelineAccount
func (m *manager) IngestAndPrepare(ctx context.Context, item Timelineable, timelineAccountID string) (bool, error) {
l := log.WithFields(kv.Fields{
{"timelineAccountID", timelineAccountID},
{"itemID", item.GetID()},
}...)
@ -132,7 +165,6 @@ func (m *manager) IngestAndPrepare(ctx context.Context, item Timelineable, timel
func (m *manager) Remove(ctx context.Context, timelineAccountID string, itemID string) (int, error) {
l := log.WithFields(kv.Fields{
{"timelineAccountID", timelineAccountID},
{"itemID", itemID},
}...)
@ -147,10 +179,7 @@ func (m *manager) Remove(ctx context.Context, timelineAccountID string, itemID s
}
func (m *manager) GetTimeline(ctx context.Context, timelineAccountID string, maxID string, sinceID string, minID string, limit int, local bool) ([]Preparable, error) {
l := log.WithFields(kv.Fields{
{"timelineAccountID", timelineAccountID},
}...)
l := log.WithFields(kv.Fields{{"timelineAccountID", timelineAccountID}}...)
t, err := m.getOrCreateTimeline(ctx, timelineAccountID)
if err != nil {
@ -173,10 +202,6 @@ func (m *manager) GetIndexedLength(ctx context.Context, timelineAccountID string
return t.ItemIndexLength(ctx)
}
func (m *manager) GetDesiredIndexLength(ctx context.Context) int {
return desiredPostIndexLength
}
func (m *manager) GetOldestIndexedID(ctx context.Context, timelineAccountID string) (string, error) {
t, err := m.getOrCreateTimeline(ctx, timelineAccountID)
if err != nil {

View file

@ -26,154 +26,12 @@ import (
"codeberg.org/gruf/go-kv"
"github.com/superseriousbusiness/gotosocial/internal/db"
"github.com/superseriousbusiness/gotosocial/internal/id"
"github.com/superseriousbusiness/gotosocial/internal/log"
)
func (t *timeline) prepareNextQuery(ctx context.Context, amount int, maxID string, sinceID string, minID string) error {
l := log.WithFields(kv.Fields{
{"amount", amount},
{"maxID", maxID},
{"sinceID", sinceID},
{"minID", minID},
}...)
var err error
// maxID is defined but sinceID isn't so take from behind
if maxID != "" && sinceID == "" {
l.Debug("preparing behind maxID")
err = t.PrepareBehind(ctx, maxID, amount)
}
// maxID isn't defined, but sinceID || minID are, so take x before
if maxID == "" && sinceID != "" {
l.Debug("preparing before sinceID")
err = t.PrepareBefore(ctx, sinceID, false, amount)
}
if maxID == "" && minID != "" {
l.Debug("preparing before minID")
err = t.PrepareBefore(ctx, minID, false, amount)
}
return err
}
func (t *timeline) PrepareBehind(ctx context.Context, itemID string, amount int) error {
// lazily initialize prepared items if it hasn't been done already
if t.preparedItems.data == nil {
t.preparedItems.data = &list.List{}
t.preparedItems.data.Init()
}
if err := t.IndexBehind(ctx, itemID, amount); err != nil {
return fmt.Errorf("PrepareBehind: error indexing behind id %s: %s", itemID, err)
}
// if the itemindex is nil, nothing has been indexed yet so there's nothing to prepare
if t.itemIndex.data == nil {
return nil
}
var prepared int
var preparing bool
t.Lock()
defer t.Unlock()
prepareloop:
for e := t.itemIndex.data.Front(); e != nil; e = e.Next() {
entry, ok := e.Value.(*itemIndexEntry)
if !ok {
return errors.New("PrepareBehind: could not parse e as itemIndexEntry")
}
if !preparing {
// we haven't hit the position we need to prepare from yet
if entry.itemID == itemID {
preparing = true
}
}
if preparing {
if err := t.prepare(ctx, entry.itemID); err != nil {
// there's been an error
if err != db.ErrNoEntries {
// it's a real error
return fmt.Errorf("PrepareBehind: error preparing item with id %s: %s", entry.itemID, err)
}
// the status just doesn't exist (anymore) so continue to the next one
continue
}
if prepared == amount {
// we're done
break prepareloop
}
prepared++
}
}
return nil
}
func (t *timeline) PrepareBefore(ctx context.Context, statusID string, include bool, amount int) error {
t.Lock()
defer t.Unlock()
// lazily initialize prepared posts if it hasn't been done already
if t.preparedItems.data == nil {
t.preparedItems.data = &list.List{}
t.preparedItems.data.Init()
}
// if the postindex is nil, nothing has been indexed yet so there's nothing to prepare
if t.itemIndex.data == nil {
return nil
}
var prepared int
var preparing bool
prepareloop:
for e := t.itemIndex.data.Back(); e != nil; e = e.Prev() {
entry, ok := e.Value.(*itemIndexEntry)
if !ok {
return errors.New("PrepareBefore: could not parse e as a postIndexEntry")
}
if !preparing {
// we haven't hit the position we need to prepare from yet
if entry.itemID == statusID {
preparing = true
if !include {
continue
}
}
}
if preparing {
if err := t.prepare(ctx, entry.itemID); err != nil {
// there's been an error
if err != db.ErrNoEntries {
// it's a real error
return fmt.Errorf("PrepareBefore: error preparing status with id %s: %s", entry.itemID, err)
}
// the status just doesn't exist (anymore) so continue to the next one
continue
}
if prepared == amount {
// we're done
break prepareloop
}
prepared++
}
}
return nil
}
func (t *timeline) PrepareFromTop(ctx context.Context, amount int) error {
l := log.WithFields(kv.Fields{
{"amount", amount},
}...)
l := log.WithFields(kv.Fields{{"amount", amount}}...)
// lazily initialize prepared posts if it hasn't been done already
if t.preparedItems.data == nil {
@ -182,10 +40,10 @@ func (t *timeline) PrepareFromTop(ctx context.Context, amount int) error {
}
// if the postindex is nil, nothing has been indexed yet so index from the highest ID possible
if t.itemIndex.data == nil {
if t.indexedItems.data == nil {
l.Debug("postindex.data was nil, indexing behind highest possible ID")
if err := t.IndexBehind(ctx, "ZZZZZZZZZZZZZZZZZZZZZZZZZZ", amount); err != nil {
return fmt.Errorf("PrepareFromTop: error indexing behind id %s: %s", "ZZZZZZZZZZZZZZZZZZZZZZZZZZ", err)
if err := t.indexBehind(ctx, id.Highest, amount); err != nil {
return fmt.Errorf("PrepareFromTop: error indexing behind id %s: %s", id.Highest, err)
}
}
@ -194,12 +52,12 @@ func (t *timeline) PrepareFromTop(ctx context.Context, amount int) error {
defer t.Unlock()
var prepared int
prepareloop:
for e := t.itemIndex.data.Front(); e != nil; e = e.Next() {
for e := t.indexedItems.data.Front(); e != nil; e = e.Next() {
if e == nil {
continue
}
entry, ok := e.Value.(*itemIndexEntry)
entry, ok := e.Value.(*indexedItemsEntry)
if !ok {
return errors.New("PrepareFromTop: could not parse e as a postIndexEntry")
}
@ -226,6 +84,142 @@ prepareloop:
return nil
}
func (t *timeline) prepareNextQuery(ctx context.Context, amount int, maxID string, sinceID string, minID string) error {
l := log.WithFields(kv.Fields{
{"amount", amount},
{"maxID", maxID},
{"sinceID", sinceID},
{"minID", minID},
}...)
var err error
switch {
case maxID != "" && sinceID == "":
l.Debug("preparing behind maxID")
err = t.prepareBehind(ctx, maxID, amount)
case maxID == "" && sinceID != "":
l.Debug("preparing before sinceID")
err = t.prepareBefore(ctx, sinceID, false, amount)
case maxID == "" && minID != "":
l.Debug("preparing before minID")
err = t.prepareBefore(ctx, minID, false, amount)
}
return err
}
// prepareBehind instructs the timeline to prepare the next amount of entries for serialization, from position onwards.
// If include is true, then the given item ID will also be prepared, otherwise only entries behind it will be prepared.
func (t *timeline) prepareBehind(ctx context.Context, itemID string, amount int) error {
// lazily initialize prepared items if it hasn't been done already
if t.preparedItems.data == nil {
t.preparedItems.data = &list.List{}
t.preparedItems.data.Init()
}
if err := t.indexBehind(ctx, itemID, amount); err != nil {
return fmt.Errorf("prepareBehind: error indexing behind id %s: %s", itemID, err)
}
// if the itemindex is nil, nothing has been indexed yet so there's nothing to prepare
if t.indexedItems.data == nil {
return nil
}
var prepared int
var preparing bool
t.Lock()
defer t.Unlock()
prepareloop:
for e := t.indexedItems.data.Front(); e != nil; e = e.Next() {
entry, ok := e.Value.(*indexedItemsEntry)
if !ok {
return errors.New("prepareBehind: could not parse e as itemIndexEntry")
}
if !preparing {
// we haven't hit the position we need to prepare from yet
if entry.itemID == itemID {
preparing = true
}
}
if preparing {
if err := t.prepare(ctx, entry.itemID); err != nil {
// there's been an error
if err != db.ErrNoEntries {
// it's a real error
return fmt.Errorf("prepareBehind: error preparing item with id %s: %s", entry.itemID, err)
}
// the status just doesn't exist (anymore) so continue to the next one
continue
}
if prepared == amount {
// we're done
break prepareloop
}
prepared++
}
}
return nil
}
func (t *timeline) prepareBefore(ctx context.Context, statusID string, include bool, amount int) error {
t.Lock()
defer t.Unlock()
// lazily initialize prepared posts if it hasn't been done already
if t.preparedItems.data == nil {
t.preparedItems.data = &list.List{}
t.preparedItems.data.Init()
}
// if the postindex is nil, nothing has been indexed yet so there's nothing to prepare
if t.indexedItems.data == nil {
return nil
}
var prepared int
var preparing bool
prepareloop:
for e := t.indexedItems.data.Back(); e != nil; e = e.Prev() {
entry, ok := e.Value.(*indexedItemsEntry)
if !ok {
return errors.New("prepareBefore: could not parse e as a postIndexEntry")
}
if !preparing {
// we haven't hit the position we need to prepare from yet
if entry.itemID == statusID {
preparing = true
if !include {
continue
}
}
}
if preparing {
if err := t.prepare(ctx, entry.itemID); err != nil {
// there's been an error
if err != db.ErrNoEntries {
// it's a real error
return fmt.Errorf("prepareBefore: error preparing status with id %s: %s", entry.itemID, err)
}
// the status just doesn't exist (anymore) so continue to the next one
continue
}
if prepared == amount {
// we're done
break prepareloop
}
prepared++
}
}
return nil
}
func (t *timeline) prepare(ctx context.Context, itemID string) error {
// trigger the caller-provided prepare function
prepared, err := t.prepareFunction(ctx, t.accountID, itemID)
@ -245,7 +239,9 @@ func (t *timeline) prepare(ctx context.Context, itemID string) error {
return t.preparedItems.insertPrepared(ctx, preparedItemsEntry)
}
func (t *timeline) OldestPreparedItemID(ctx context.Context) (string, error) {
// oldestPreparedItemID returns the id of the rearmost (ie., the oldest) prepared item, or an error if something goes wrong.
// If nothing goes wrong but there's no oldest item, an empty string will be returned so make sure to check for this.
func (t *timeline) oldestPreparedItemID(ctx context.Context) (string, error) {
var id string
if t.preparedItems == nil || t.preparedItems.data == nil {
// return an empty string if prepared items hasn't been initialized yet
@ -260,7 +256,7 @@ func (t *timeline) OldestPreparedItemID(ctx context.Context) (string, error) {
entry, ok := e.Value.(*preparedItemsEntry)
if !ok {
return id, errors.New("OldestPreparedItemID: could not parse e as a preparedItemsEntry")
return id, errors.New("oldestPreparedItemID: could not parse e as a preparedItemsEntry")
}
return entry.itemID, nil

View file

@ -37,30 +37,30 @@ type preparedItemsEntry struct {
prepared Preparable
}
func (p *preparedItems) insertPrepared(ctx context.Context, i *preparedItemsEntry) error {
func (p *preparedItems) insertPrepared(ctx context.Context, newEntry *preparedItemsEntry) error {
if p.data == nil {
p.data = &list.List{}
}
// if we have no entries yet, this is both the newest and oldest entry, so just put it in the front
if p.data.Len() == 0 {
p.data.PushFront(i)
p.data.PushFront(newEntry)
return nil
}
var insertMark *list.Element
var position int
// We need to iterate through the index to make sure we put this post in the appropriate place according to when it was created.
// We also need to make sure we're not inserting a duplicate post -- this can happen sometimes and it's not nice UX (*shudder*).
// We need to iterate through the index to make sure we put this entry in the appropriate place according to when it was created.
// We also need to make sure we're not inserting a duplicate entry -- this can happen sometimes and it's not nice UX (*shudder*).
for e := p.data.Front(); e != nil; e = e.Next() {
position++
entry, ok := e.Value.(*preparedItemsEntry)
if !ok {
return errors.New("index: could not parse e as a preparedPostsEntry")
return errors.New("insertPrepared: could not parse e as a preparedItemsEntry")
}
skip, err := p.skipInsert(ctx, i.itemID, i.accountID, i.boostOfID, i.boostOfAccountID, entry.itemID, entry.accountID, entry.boostOfID, entry.boostOfAccountID, position)
skip, err := p.skipInsert(ctx, newEntry.itemID, newEntry.accountID, newEntry.boostOfID, newEntry.boostOfAccountID, entry.itemID, entry.accountID, entry.boostOfID, entry.boostOfAccountID, position)
if err != nil {
return err
}
@ -68,25 +68,25 @@ func (p *preparedItems) insertPrepared(ctx context.Context, i *preparedItemsEntr
return nil
}
// if the post to index is newer than e, insert it before e in the list
// if the entry to index is newer than e, insert it before e in the list
if insertMark == nil {
if i.itemID > entry.itemID {
if newEntry.itemID > entry.itemID {
insertMark = e
}
}
// make sure we don't insert a duplicate
if entry.itemID == i.itemID {
if entry.itemID == newEntry.itemID {
return nil
}
}
if insertMark != nil {
p.data.InsertBefore(i, insertMark)
p.data.InsertBefore(newEntry, insertMark)
return nil
}
// if we reach this point it's the oldest post we've seen so put it at the back
p.data.PushBack(i)
// if we reach this point it's the oldest entry we've seen so put it at the back
p.data.PushBack(newEntry)
return nil
}

View file

@ -0,0 +1,68 @@
/*
GoToSocial
Copyright (C) 2021-2022 GoToSocial Authors admin@gotosocial.org
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package timeline
import (
"container/list"
)
const (
defaultDesiredIndexedItemsLength = 400
defaultDesiredPreparedItemsLength = 50
)
func (t *timeline) Prune(desiredPreparedItemsLength int, desiredIndexedItemsLength int) int {
t.Lock()
defer t.Unlock()
pruneList := func(pruneTo int, listToPrune *list.List) int {
if listToPrune == nil {
// no need to prune
return 0
}
unprunedLength := listToPrune.Len()
if unprunedLength <= pruneTo {
// no need to prune
return 0
}
// work from the back + assemble a slice of entries that we will prune
amountStillToPrune := unprunedLength - pruneTo
itemsToPrune := make([]*list.Element, 0, amountStillToPrune)
for e := listToPrune.Back(); amountStillToPrune > 0; e = e.Prev() {
itemsToPrune = append(itemsToPrune, e)
amountStillToPrune--
}
// remove the entries we found
var totalPruned int
for _, e := range itemsToPrune {
listToPrune.Remove(e)
totalPruned++
}
return totalPruned
}
prunedPrepared := pruneList(desiredPreparedItemsLength, t.preparedItems.data)
prunedIndexed := pruneList(desiredIndexedItemsLength, t.indexedItems.data)
return prunedPrepared + prunedIndexed
}

View file

@ -0,0 +1,110 @@
/*
GoToSocial
Copyright (C) 2021-2022 GoToSocial Authors admin@gotosocial.org
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package timeline_test
import (
"context"
"sort"
"testing"
"github.com/stretchr/testify/suite"
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
"github.com/superseriousbusiness/gotosocial/internal/processing"
"github.com/superseriousbusiness/gotosocial/internal/timeline"
"github.com/superseriousbusiness/gotosocial/internal/visibility"
"github.com/superseriousbusiness/gotosocial/testrig"
)
type PruneTestSuite struct {
TimelineStandardTestSuite
}
func (suite *PruneTestSuite) SetupSuite() {
suite.testAccounts = testrig.NewTestAccounts()
suite.testStatuses = testrig.NewTestStatuses()
}
func (suite *PruneTestSuite) SetupTest() {
testrig.InitTestLog()
testrig.InitTestConfig()
suite.db = testrig.NewTestDB()
suite.tc = testrig.NewTestTypeConverter(suite.db)
suite.filter = visibility.NewFilter(suite.db)
testrig.StandardDBSetup(suite.db, nil)
// let's take local_account_1 as the timeline owner
tl, err := timeline.NewTimeline(
context.Background(),
suite.testAccounts["local_account_1"].ID,
processing.StatusGrabFunction(suite.db),
processing.StatusFilterFunction(suite.db, suite.filter),
processing.StatusPrepareFunction(suite.db, suite.tc),
processing.StatusSkipInsertFunction(),
)
if err != nil {
suite.FailNow(err.Error())
}
// put the status IDs in a determinate order since we can't trust a map to keep its order
statuses := []*gtsmodel.Status{}
for _, s := range suite.testStatuses {
statuses = append(statuses, s)
}
sort.Slice(statuses, func(i, j int) bool {
return statuses[i].ID > statuses[j].ID
})
// prepare the timeline by just shoving all test statuses in it -- let's not be fussy about who sees what
for _, s := range statuses {
_, err := tl.IndexAndPrepareOne(context.Background(), s.GetID(), s.BoostOfID, s.AccountID, s.BoostOfAccountID)
if err != nil {
suite.FailNow(err.Error())
}
}
suite.timeline = tl
}
func (suite *PruneTestSuite) TearDownTest() {
testrig.StandardDBTeardown(suite.db)
}
func (suite *PruneTestSuite) TestPrune() {
// prune down to 5 prepared + 5 indexed
suite.Equal(24, suite.timeline.Prune(5, 5))
suite.Equal(5, suite.timeline.ItemIndexLength(context.Background()))
}
func (suite *PruneTestSuite) TestPruneTo0() {
// prune down to 0 prepared + 0 indexed
suite.Equal(34, suite.timeline.Prune(0, 0))
suite.Equal(0, suite.timeline.ItemIndexLength(context.Background()))
}
func (suite *PruneTestSuite) TestPruneToInfinityAndBeyond() {
// prune to 99999, this should result in no entries being pruned
suite.Equal(0, suite.timeline.Prune(99999, 99999))
suite.Equal(17, suite.timeline.ItemIndexLength(context.Background()))
}
func TestPruneTestSuite(t *testing.T) {
suite.Run(t, new(PruneTestSuite))
}

View file

@ -29,7 +29,6 @@ import (
func (t *timeline) Remove(ctx context.Context, statusID string) (int, error) {
l := log.WithFields(kv.Fields{
{"accountTimeline", t.accountID},
{"statusID", statusID},
}...)
@ -40,9 +39,9 @@ func (t *timeline) Remove(ctx context.Context, statusID string) (int, error) {
// remove entr(ies) from the post index
removeIndexes := []*list.Element{}
if t.itemIndex != nil && t.itemIndex.data != nil {
for e := t.itemIndex.data.Front(); e != nil; e = e.Next() {
entry, ok := e.Value.(*itemIndexEntry)
if t.indexedItems != nil && t.indexedItems.data != nil {
for e := t.indexedItems.data.Front(); e != nil; e = e.Next() {
entry, ok := e.Value.(*indexedItemsEntry)
if !ok {
return removed, errors.New("Remove: could not parse e as a postIndexEntry")
}
@ -53,7 +52,7 @@ func (t *timeline) Remove(ctx context.Context, statusID string) (int, error) {
}
}
for _, e := range removeIndexes {
t.itemIndex.data.Remove(e)
t.indexedItems.data.Remove(e)
removed++
}
@ -82,19 +81,19 @@ func (t *timeline) Remove(ctx context.Context, statusID string) (int, error) {
func (t *timeline) RemoveAllBy(ctx context.Context, accountID string) (int, error) {
l := log.WithFields(kv.Fields{
{"accountTimeline", t.accountID},
{"accountID", accountID},
}...)
t.Lock()
defer t.Unlock()
var removed int
// remove entr(ies) from the post index
removeIndexes := []*list.Element{}
if t.itemIndex != nil && t.itemIndex.data != nil {
for e := t.itemIndex.data.Front(); e != nil; e = e.Next() {
entry, ok := e.Value.(*itemIndexEntry)
if t.indexedItems != nil && t.indexedItems.data != nil {
for e := t.indexedItems.data.Front(); e != nil; e = e.Next() {
entry, ok := e.Value.(*indexedItemsEntry)
if !ok {
return removed, errors.New("Remove: could not parse e as a postIndexEntry")
}
@ -105,7 +104,7 @@ func (t *timeline) RemoveAllBy(ctx context.Context, accountID string) (int, erro
}
}
for _, e := range removeIndexes {
t.itemIndex.data.Remove(e)
t.indexedItems.data.Remove(e)
removed++
}

View file

@ -21,6 +21,7 @@ package timeline
import (
"context"
"sync"
"time"
)
// GrabFunction is used by a Timeline to grab more items to index.
@ -73,26 +74,9 @@ type Timeline interface {
// If prepareNext is true, then the next predicted query will be prepared already in a goroutine,
// to make the next call to Get faster.
Get(ctx context.Context, amount int, maxID string, sinceID string, minID string, prepareNext bool) ([]Preparable, error)
// GetXFromTop returns x amount of items from the top of the timeline, from newest to oldest.
GetXFromTop(ctx context.Context, amount int) ([]Preparable, error)
// GetXBehindID returns x amount of items from the given id onwards, from newest to oldest.
// This will NOT include the item with the given ID.
//
// This corresponds to an api call to /timelines/home?max_id=WHATEVER
GetXBehindID(ctx context.Context, amount int, fromID string, attempts *int) ([]Preparable, error)
// GetXBeforeID returns x amount of items up to the given id, from newest to oldest.
// This will NOT include the item with the given ID.
//
// This corresponds to an api call to /timelines/home?since_id=WHATEVER
GetXBeforeID(ctx context.Context, amount int, sinceID string, startFromTop bool) ([]Preparable, error)
// GetXBetweenID returns x amount of items from the given maxID, up to the given id, from newest to oldest.
// This will NOT include the item with the given IDs.
//
// This corresponds to an api call to /timelines/home?since_id=WHATEVER&max_id=WHATEVER_ELSE
GetXBetweenID(ctx context.Context, amount int, maxID string, sinceID string) ([]Preparable, error)
/*
INDEXING FUNCTIONS
INDEXING + PREPARATION FUNCTIONS
*/
// IndexOne puts a item into the timeline at the appropriate place according to its 'createdAt' property.
@ -100,35 +84,14 @@ type Timeline interface {
// The returned bool indicates whether or not the item was actually inserted into the timeline. This will be false
// if the item is a boost and the original item or another boost of it already exists < boostReinsertionDepth back in the timeline.
IndexOne(ctx context.Context, itemID string, boostOfID string, accountID string, boostOfAccountID string) (bool, error)
// OldestIndexedItemID returns the id of the rearmost (ie., the oldest) indexed item, or an error if something goes wrong.
// If nothing goes wrong but there's no oldest item, an empty string will be returned so make sure to check for this.
OldestIndexedItemID(ctx context.Context) (string, error)
// NewestIndexedItemID returns the id of the frontmost (ie., the newest) indexed item, or an error if something goes wrong.
// If nothing goes wrong but there's no newest item, an empty string will be returned so make sure to check for this.
NewestIndexedItemID(ctx context.Context) (string, error)
IndexBefore(ctx context.Context, itemID string, amount int) error
IndexBehind(ctx context.Context, itemID string, amount int) error
/*
PREPARATION FUNCTIONS
*/
// PrepareXFromTop instructs the timeline to prepare x amount of items from the top of the timeline.
PrepareFromTop(ctx context.Context, amount int) error
// PrepareBehind instructs the timeline to prepare the next amount of entries for serialization, from position onwards.
// If include is true, then the given item ID will also be prepared, otherwise only entries behind it will be prepared.
PrepareBehind(ctx context.Context, itemID string, amount int) error
// IndexOne puts a item into the timeline at the appropriate place according to its 'createdAt' property,
// IndexAndPrepareOne puts a item into the timeline at the appropriate place according to its 'createdAt' property,
// and then immediately prepares it.
//
// The returned bool indicates whether or not the item was actually inserted into the timeline. This will be false
// if the item is a boost and the original item or another boost of it already exists < boostReinsertionDepth back in the timeline.
IndexAndPrepareOne(ctx context.Context, itemID string, boostOfID string, accountID string, boostOfAccountID string) (bool, error)
// OldestPreparedItemID returns the id of the rearmost (ie., the oldest) prepared item, or an error if something goes wrong.
// If nothing goes wrong but there's no oldest item, an empty string will be returned so make sure to check for this.
OldestPreparedItemID(ctx context.Context) (string, error)
// PrepareXFromTop instructs the timeline to prepare x amount of items from the top of the timeline, useful during init.
PrepareFromTop(ctx context.Context, amount int) error
/*
INFO FUNCTIONS
@ -136,13 +99,24 @@ type Timeline interface {
// ActualPostIndexLength returns the actual length of the item index at this point in time.
ItemIndexLength(ctx context.Context) int
// OldestIndexedItemID returns the id of the rearmost (ie., the oldest) indexed item, or an error if something goes wrong.
// If nothing goes wrong but there's no oldest item, an empty string will be returned so make sure to check for this.
OldestIndexedItemID(ctx context.Context) (string, error)
// NewestIndexedItemID returns the id of the frontmost (ie., the newest) indexed item, or an error if something goes wrong.
// If nothing goes wrong but there's no newest item, an empty string will be returned so make sure to check for this.
NewestIndexedItemID(ctx context.Context) (string, error)
/*
UTILITY FUNCTIONS
*/
// Reset instructs the timeline to reset to its base state -- cache only the minimum amount of items.
Reset() error
// LastGot returns the time that Get was last called.
LastGot() time.Time
// Prune prunes preparedItems and indexedItems in this timeline to the desired lengths.
// This will be a no-op if the lengths are already < the desired values.
// Prune acquires a lock on the timeline before pruning.
// The return value is the combined total of items pruned from preparedItems and indexedItems.
Prune(desiredPreparedItemsLength int, desiredIndexedItemsLength int) int
// Remove removes a item from both the index and prepared items.
//
// If a item has multiple entries in a timeline, they will all be removed.
@ -157,12 +131,13 @@ type Timeline interface {
// timeline fulfils the Timeline interface
type timeline struct {
itemIndex *itemIndex
indexedItems *indexedItems
preparedItems *preparedItems
grabFunction GrabFunction
filterFunction FilterFunction
prepareFunction PrepareFunction
accountID string
lastGot time.Time
sync.Mutex
}
@ -175,7 +150,7 @@ func NewTimeline(
prepareFunction PrepareFunction,
skipInsertFunction SkipInsertFunction) (Timeline, error) {
return &timeline{
itemIndex: &itemIndex{
indexedItems: &indexedItems{
skipInsert: skipInsertFunction,
},
preparedItems: &preparedItems{
@ -185,17 +160,6 @@ func NewTimeline(
filterFunction: filterFunction,
prepareFunction: prepareFunction,
accountID: timelineAccountID,
lastGot: time.Time{},
}, nil
}
func (t *timeline) Reset() error {
return nil
}
func (t *timeline) ItemIndexLength(ctx context.Context) int {
if t.itemIndex == nil || t.itemIndex.data == nil {
return 0
}
return t.itemIndex.data.Len()
}

View file

@ -34,13 +34,6 @@ import (
"github.com/superseriousbusiness/gotosocial/internal/log"
)
// const (
// // highestID is the highest possible ULID
// highestID = "ZZZZZZZZZZZZZZZZZZZZZZZZZZ"
// // lowestID is the lowest possible ULID
// lowestID = "00000000000000000000000000"
// )
// Converts a gts model account into an Activity Streams person type.
func (c *converter) AccountToAS(ctx context.Context, a *gtsmodel.Account) (vocab.ActivityStreamsPerson, error) {
person := streams.NewActivityStreamsPerson()