mirror of
synced 2025-03-14 20:20:51 +03:00
start hacking away at the federating db
This commit is contained in:
10 changed files with 279 additions and 140 deletions
@ -19,32 +19,37 @@ package users
import (
apiutil "github.com/superseriousbusiness/gotosocial/internal/api/util"
"github.com/superseriousbusiness/gotosocial/internal/gtserror" //nolint:typecheck
var errMalformed = errors.New("inbox POST could not be processed as an ActivityPub request; request may be malformed or insufficient") // mood
// InboxPOSTHandler deals with incoming POST requests to an actor's inbox.
// Eg., POST to https://example.org/users/whatever/inbox.
func (m *Module) InboxPOSTHandler(c *gin.Context) {
// usernames on our instance are always lowercase
requestedUsername := strings.ToLower(c.Param(UsernameKey))
if requestedUsername == "" {
err := errors.New("no username specified in request")
apiutil.ErrorHandler(c, gtserror.NewErrorBadRequest(err, err.Error()), m.processor.InstanceGetV1)
handled, err := m.processor.Fedi().InboxPost(apiutil.TransferSignatureContext(c), c.Writer, c.Request)
if err != nil {
// If a WithCode was returned to us, we can be nice and
// try to return something informative to the caller.
if withCode := new(gtserror.WithCode); errors.As(err, withCode) {
apiutil.ErrorHandler(c, *withCode, m.processor.InstanceGetV1)
// Something else went wrong, log the error but don't
// return it to the caller, to avoid leaking internals.
log.WithContext(c.Request.Context()).Errorf("returning Bad Request to caller, err was: %q", err)
apiutil.ErrorHandler(c, gtserror.NewErrorBadRequest(err), m.processor.InstanceGetV1)
if posted, err := m.processor.Fedi().InboxPost(apiutil.TransferSignatureContext(c), c.Writer, c.Request); err != nil {
if withCode, ok := err.(gtserror.WithCode); ok {
apiutil.ErrorHandler(c, withCode, m.processor.InstanceGetV1)
} else {
apiutil.ErrorHandler(c, gtserror.NewErrorBadRequest(err, err.Error()), m.processor.InstanceGetV1)
} else if !posted {
err := errors.New("unable to process request")
apiutil.ErrorHandler(c, gtserror.NewErrorBadRequest(err, err.Error()), m.processor.InstanceGetV1)
if !handled {
// Wasn't a type we could understand; we should let the caller know this.
withCode := gtserror.NewErrorBadRequest(errMalformed, errMalformed.Error())
apiutil.ErrorHandler(c, withCode, m.processor.InstanceGetV1)
@ -21,6 +21,8 @@ import (
@ -35,6 +37,7 @@ import (
@ -44,11 +47,79 @@ type InboxPostTestSuite struct {
func (suite *InboxPostTestSuite) TestPostBlock() {
blockingAccount := suite.testAccounts["remote_account_1"]
blockedAccount := suite.testAccounts["local_account_1"]
blockURI := testrig.URLMustParse("http://fossbros-anonymous.io/users/foss_satan/blocks/01FG9C441MCTW3R2W117V2PQK3")
func (suite *InboxPostTestSuite) inboxPost(
activity pub.Activity,
requestingAccount *gtsmodel.Account,
targetAccount *gtsmodel.Account,
expectedHTTPStatus int,
expectedBody string,
) {
var (
recorder = httptest.NewRecorder()
ctx, _ = testrig.CreateGinTestContext(recorder, nil)
// Prepare the requst body bytes.
bodyI, err := ap.Serialize(activity)
if err != nil {
b, err := json.Marshal(bodyI)
if err != nil {
// Prepare signature headers for this Activity.
signature, digestHeader, dateHeader := testrig.GetSignatureForActivity(
// Put the request together.
ctx.AddParam(users.UsernameKey, targetAccount.Username)
ctx.Request = httptest.NewRequest(http.MethodPost, targetAccount.InboxURI, bytes.NewReader(b))
ctx.Request.Header.Set("Signature", signature)
ctx.Request.Header.Set("Date", dateHeader)
ctx.Request.Header.Set("Digest", digestHeader)
ctx.Request.Header.Set("Content-Type", "application/activity+json")
// Pass the context through signature check
// middleware first to populate it appropriately.
// Trigger the function being tested.
// Read the result.
result := recorder.Result()
defer result.Body.Close()
b, err = io.ReadAll(result.Body)
if err != nil {
errs := gtserror.MultiError{}
// Check expected code + body.
if resultCode := recorder.Code; expectedHTTPStatus != resultCode {
errs = append(errs, fmt.Sprintf("expected %d got %d", expectedHTTPStatus, resultCode))
// If we got an expected body, return early.
if expectedBody != "" && string(b) != expectedBody {
errs = append(errs, fmt.Sprintf("expected %s got %s", expectedBody, string(b)))
if err := errs.Combine(); err != nil {
suite.FailNow("", "%v (body %s)", err, string(b))
func (suite *InboxPostTestSuite) newBlock(blockID string, blockingAccount *gtsmodel.Account, blockedAccount *gtsmodel.Account) vocab.ActivityStreamsBlock {
block := streams.NewActivityStreamsBlock()
// set the actor property to the block-ing account's URI
@ -59,7 +130,7 @@ func (suite *InboxPostTestSuite) TestPostBlock() {
// set the ID property to the blocks's URI
idProp := streams.NewJSONLDIdProperty()
// set the object property to the target account's URI
@ -74,57 +145,32 @@ func (suite *InboxPostTestSuite) TestPostBlock() {
targetURI := testrig.URLMustParse(blockedAccount.InboxURI)
return block
signature, digestHeader, dateHeader := testrig.GetSignatureForActivity(block, blockingAccount.PublicKeyURI, blockingAccount.PrivateKey, targetURI)
bodyI, err := ap.Serialize(block)
func (suite *InboxPostTestSuite) TestPostBlock() {
var (
requestingAccount = suite.testAccounts["remote_account_1"]
targetAccount = suite.testAccounts["local_account_1"]
activityID = requestingAccount.URI + "/some-new-activity/01FG9C441MCTW3R2W117V2PQK3"
bodyJson, err := json.Marshal(bodyI)
body := bytes.NewReader(bodyJson)
tc := testrig.NewTestTransportController(&suite.state, testrig.NewMockHTTPClient(nil, "../../../../testrig/media"))
federator := testrig.NewTestFederator(&suite.state, tc, suite.mediaManager)
emailSender := testrig.NewEmailSender("../../../../web/template/", nil)
processor := testrig.NewTestProcessor(&suite.state, federator, emailSender, suite.mediaManager)
userModule := users.New(processor)
// setup request
recorder := httptest.NewRecorder()
ctx, _ := testrig.CreateGinTestContext(recorder, nil)
ctx.Request = httptest.NewRequest(http.MethodPost, targetURI.String(), body) // the endpoint we're hitting
ctx.Request.Header.Set("Signature", signature)
ctx.Request.Header.Set("Date", dateHeader)
ctx.Request.Header.Set("Digest", digestHeader)
ctx.Request.Header.Set("Content-Type", "application/activity+json")
// we need to pass the context through signature check first to set appropriate values on it
// normally the router would populate these params from the path values,
// but because we're calling the function directly, we need to set them manually.
ctx.Params = gin.Params{
Key: users.UsernameKey,
Value: blockedAccount.Username,
// trigger the function being tested
result := recorder.Result()
defer result.Body.Close()
b, err := ioutil.ReadAll(result.Body)
block := suite.newBlock(activityID, requestingAccount, targetAccount)
suite.inboxPost(block, requestingAccount, targetAccount, http.StatusAccepted, "")
// there should be a block in the database now between the accounts
dbBlock, err := suite.db.GetBlock(context.Background(), blockingAccount.ID, blockedAccount.ID)
var (
dbBlock *gtsmodel.Block
err error
if !testrig.WaitFor(func() bool {
dbBlock, err = suite.db.GetBlock(context.Background(), requestingAccount.ID, targetAccount.ID)
return err == nil && dbBlock != nil
}) {
suite.FailNow("timed out waiting for block to be created")
suite.WithinDuration(time.Now(), dbBlock.CreatedAt, 30*time.Second)
suite.WithinDuration(time.Now(), dbBlock.UpdatedAt, 30*time.Second)
suite.Equal("http://fossbros-anonymous.io/users/foss_satan/blocks/01FG9C441MCTW3R2W117V2PQK3", dbBlock.URI)
@ -20,6 +20,7 @@ package federation
import (
@ -152,7 +153,7 @@ func (f *federatingActor) PostInboxScheme(ctx context.Context, w http.ResponseWr
activity, ok := t.(pub.Activity)
if !ok {
err = fmt.Errorf("ActivityStreams value with type %T is not a pub.Activity", t)
err = fmt.Errorf("PostInboxScheme: ActivityStreams value with type %T is not a pub.Activity", t)
return true, err
@ -199,20 +200,22 @@ func (f *federatingActor) PostInboxScheme(ctx context.Context, w http.ResponseWr
// target properties needed to be populated, but weren't.
// Send the rejection to the peer.
if err == pub.ErrObjectRequired || err == pub.ErrTargetRequired {
if errors.Is(err, pub.ErrObjectRequired) || errors.Is(err, pub.ErrTargetRequired) {
l.Debugf("malformed incoming Activity: %s", err)
return true, nil
// There's been some real error.
err = fmt.Errorf("PostInboxScheme: error calling sideEffectActor.PostInbox: %w", err)
return true, err
// Our side effects are complete, now delegate determining whether to do inbox forwarding, as well as the action to do it.
if err := f.sideEffectActor.InboxForwarding(ctx, inboxID, activity); err != nil {
err = fmt.Errorf("PostInboxScheme: error calling sideEffectActor.InboxForwarding: %w", err)
return true, err
// // Our side effects are complete, now delegate determining whether to do inbox forwarding, as well as the action to do it.
// if err := f.sideEffectActor.InboxForwarding(ctx, inboxID, activity); err != nil {
// err = fmt.Errorf("PostInboxScheme: error calling sideEffectActor.InboxForwarding: %w", err)
// return true, err
// }
// Request is now undergoing processing.
// Respond with an Accepted status.
Normal file
Normal file
@ -0,0 +1,41 @@
package federatingdb
import (
func (f *federatingDB) Block(ctx context.Context, block vocab.ActivityStreamsBlock) error {
receivingAccount, _ := extractFromCtx(ctx)
if receivingAccount == nil {
// If the receiving account wasn't set on the context, that means this request didn't pass
// through the API, but came from inside GtS as the result of another activity on this instance. That being so,
// we can safely just ignore this activity, since we know we've already processed it elsewhere.
return nil
gtsBlock, err := f.typeConverter.ASBlockToBlock(ctx, block)
if err != nil {
return fmt.Errorf("Block: could not convert Block to gts model block")
gtsBlock.ID = id.NewULID()
if err := f.state.DB.PutBlock(ctx, gtsBlock); err != nil {
return fmt.Errorf("Block: database error inserting block: %s", err)
f.state.Workers.EnqueueFederator(ctx, messages.FromFederator{
APObjectType: ap.ActivityBlock,
APActivityType: ap.ActivityCreate,
GTSModel: block,
ReceivingAccount: receivingAccount,
return nil
@ -21,13 +21,13 @@ import (
@ -66,9 +66,6 @@ func (f *federatingDB) Create(ctx context.Context, asType vocab.Type) error {
switch asType.GetTypeName() {
case ap.ActivityBlock:
return f.activityBlock(ctx, asType, receivingAccount, requestingAccount)
case ap.ActivityCreate:
return f.activityCreate(ctx, asType, receivingAccount, requestingAccount)
@ -122,41 +119,52 @@ func (f *federatingDB) activityBlock(ctx context.Context, asType vocab.Type, rec
func (f *federatingDB) activityCreate(ctx context.Context, asType vocab.Type, receivingAccount *gtsmodel.Account, requestingAccount *gtsmodel.Account) error {
create, ok := asType.(vocab.ActivityStreamsCreate)
if !ok {
return errors.New("activityCreate: could not convert type to create")
return errors.New("activityCreate: could resolve %T to Create")
// create should have an object
// Create should have an object.
object := create.GetActivityStreamsObject()
if object == nil {
return errors.New("Create had no Object")
errs := []string{}
// iterate through the object(s) to see what we're meant to be creating
for objectIter := object.Begin(); objectIter != object.End(); objectIter = objectIter.Next() {
asObjectType := objectIter.GetType()
if asObjectType == nil {
// currently we can't do anything with just a Create of something that's not an Object with a type
// TODO: process a Create with an Object that's just a URI or something
errs = append(errs, "object of Create was not a Type")
// Iterate through the Object(s) to see what we're meant to be creating.
errs := make(gtserror.MultiError, 0, object.Len())
for iter := object.Begin(); iter != object.End(); iter = iter.Next() {
objectType := iter.GetType()
if objectType == nil {
// Currently we can't do anything with just a Create
// of something that's not an Object with a type.
errs.Append(errors.New("object of Create was not a Type"))
// we have a type -- what is it?
asObjectTypeName := asObjectType.GetTypeName()
switch asObjectTypeName {
// Process object according to its type.
// TODO: possibly add more types here.
switch typeName := objectType.GetTypeName(); typeName {
case ap.ObjectNote:
if err := f.createNote(ctx, objectIter.GetActivityStreamsNote(), receivingAccount, requestingAccount); err != nil {
errs = append(errs, err.Error())
if err := f.createNote(
); err != nil {
errs = append(errs, fmt.Sprintf("received an object on a Create that we couldn't handle: %s", asObjectType.GetTypeName()))
{"receivingAccount", receivingAccount.URI},
{"requestingAccount", requestingAccount.URI},
{"typeName", typeName},
Debug("Object of Create was a type we couldn't handle")
if len(errs) != 0 {
return fmt.Errorf("activityCreate: one or more errors while processing activity: %s", strings.Join(errs, "; "))
return fmt.Errorf("activityCreate: one or more errors while processing activity: %w", errs.Combine())
return nil
@ -164,43 +172,44 @@ func (f *federatingDB) activityCreate(ctx context.Context, asType vocab.Type, re
// createNote handles a Create activity with a Note type.
func (f *federatingDB) createNote(ctx context.Context, note vocab.ActivityStreamsNote, receivingAccount *gtsmodel.Account, requestingAccount *gtsmodel.Account) error {
l := log.WithContext(ctx).
{"receivingAccount", receivingAccount.URI},
{"requestingAccount", requestingAccount.URI},
// Check if we have a forward.
// In other words, was the note posted to our inbox by at least one actor who actually created the note, or are they just forwarding it?
forward := true
// note should have an attributedTo
// Note must have an attributedTo for us to process it.
noteAttributedTo := note.GetActivityStreamsAttributedTo()
if noteAttributedTo == nil {
return errors.New("createNote: note had no attributedTo")
// compare the attributedTo(s) with the actor who posted this to our inbox
for attributedToIter := noteAttributedTo.Begin(); attributedToIter != noteAttributedTo.End(); attributedToIter = attributedToIter.Next() {
if !attributedToIter.IsIRI() {
// Check if we have a forward. In other words, was the
// note posted to our inbox by at least one actor who
// created the note, or are they just forwarding it?
forward := true
// Compare the attributedTo(s) with the URI of the Actor
// who posted this to our inbox. If the actor who posted
// the Note to our inbox is the same as at least one of
// the creators of the Note, then it's not a forward.
for iter := noteAttributedTo.Begin(); iter != noteAttributedTo.End(); iter = iter.Next() {
if !iter.IsIRI() {
iri := attributedToIter.GetIRI()
if requestingAccount.URI == iri.String() {
// at least one creator of the note, and the actor who posted the note to our inbox, are the same, so it's not a forward
if iri := iter.GetIRI(); iri != nil && iri.String() == requestingAccount.URI {
forward = false
// If we do have a forward, we should ignore the content for now and just dereference based on the URL/ID of the note instead, to get the note straight from the horse's mouth
// If we do have a forward, we should ignore the content for
// now and just dereference based on the URL/ID of the note
// instead, to get the content straight from the poster's mouth.
if forward {
l.Trace("note is a forward")
id := note.GetJSONLDId()
if !id.IsIRI() {
// if the note id isn't an IRI, there's nothing we can do here
// If the ID isn't an IRI, then firstly this
// is weird, and secondly we can't process it.
return nil
// pass the note iri into the processor and have it do the dereferencing instead of doing it here
// Process the Note asynchronously, we're done here.
f.state.Workers.EnqueueFederator(ctx, messages.FromFederator{
APObjectType: ap.ObjectNote,
APActivityType: ap.ActivityCreate,
@ -209,33 +218,39 @@ func (f *federatingDB) createNote(ctx context.Context, note vocab.ActivityStream
GTSModel: nil,
ReceivingAccount: receivingAccount,
return nil
// if we reach this point, we know it's not a forwarded status, so proceed with processing it as normal
// If we reach this point, we know the status wasn't forwarded
// to us, but was delivered by at least one of the Actors who
// created it, so proceed with processing it as normal.
status, err := f.typeConverter.ASStatusToStatus(ctx, note)
if err != nil {
return fmt.Errorf("createNote: error converting note to status: %s", err)
return fmt.Errorf("createNote: error converting note to status: %w", err)
// id the status based on the time it was created
// id the status based on the time it was created;
// this allows for backdating of statuses.
statusID, err := id.NewULIDFromTime(status.CreatedAt)
if err != nil {
return err
return fmt.Errorf("createNote: error creating id for note: %w", err)
status.ID = statusID
if err := f.state.DB.PutStatus(ctx, status); err != nil {
if errors.Is(err, db.ErrAlreadyExists) {
// the status already exists in the database, which means we've already handled everything else,
// so we can just return nil here and be done with it.
// The status already exists in the database, which
// means we've already handled everything else, so
// we can just return nil here and be done with it.
return nil
// an actual error has happened
return fmt.Errorf("createNote: database error inserting status: %s", err)
// An actual error has happened.
return fmt.Errorf("createNote: database error inserting status: %w", err)
// Do further processing asynchronously.
f.state.Workers.EnqueueFederator(ctx, messages.FromFederator{
APObjectType: ap.ObjectNote,
APActivityType: ap.ActivityCreate,
@ -34,6 +34,7 @@ type DB interface {
Accept(ctx context.Context, accept vocab.ActivityStreamsAccept) error
Reject(ctx context.Context, reject vocab.ActivityStreamsReject) error
Announce(ctx context.Context, announce vocab.ActivityStreamsAnnounce) error
Block(ctx context.Context, block vocab.ActivityStreamsBlock) error
// FederatingDB uses the underlying DB interface to implement the go-fed pub.Database interface.
Normal file
Normal file
@ -0,0 +1,3 @@
package federatingdb
@ -378,6 +378,9 @@ func (f *federator) FederatingCallbacks(ctx context.Context) (wrapped pub.Federa
func(ctx context.Context, announce vocab.ActivityStreamsAnnounce) error {
return f.FederatingDB().Announce(ctx, announce)
func(ctx context.Context, block vocab.ActivityStreamsBlock) error {
return f.FederatingDB().Block(ctx, block)
@ -21,7 +21,6 @@ import (
@ -29,20 +28,6 @@ import (
// InboxPost handles POST requests to a user's inbox for new activitypub messages.
// InboxPost returns true if the request was handled as an ActivityPub POST to an actor's inbox.
// If false, the request was not an ActivityPub request and may still be handled by the caller in another way, such as serving a web page.
// If the error is nil, then the ResponseWriter's headers and response has already been written. If a non-nil error is returned, then no response has been written.
// If the Actor was constructed with the Federated Protocol enabled, side effects will occur.
// If the Federated Protocol is not enabled, writes the http.StatusMethodNotAllowed status code in the response. No side effects occur.
func (p *Processor) InboxPost(ctx context.Context, w http.ResponseWriter, r *http.Request) (bool, error) {
return p.federator.FederatingActor().PostInbox(ctx, w, r)
// OutboxGet returns the activitypub representation of a local user's outbox.
// This contains links to PUBLIC posts made by this user.
func (p *Processor) OutboxGet(ctx context.Context, requestedUsername string, page bool, maxID string, minID string) (interface{}, gtserror.WithCode) {
Normal file
Normal file
@ -0,0 +1,37 @@
// GoToSocial
// Copyright (C) GoToSocial Authors admin@gotosocial.org
// SPDX-License-Identifier: AGPL-3.0-or-later
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package fedi
import (
// InboxPost handles POST requests to a user's inbox for new activitypub messages.
// InboxPost returns true if the request was handled as an ActivityPub POST to an actor's inbox.
// If false, the request was not an ActivityPub request and may still be handled by the caller in another way, such as serving a web page.
// If the error is nil, then the ResponseWriter's headers and response has already been written. If a non-nil error is returned, then no response has been written.
// If the Actor was constructed with the Federated Protocol enabled, side effects will occur.
// If the Federated Protocol is not enabled, writes the http.StatusMethodNotAllowed status code in the response. No side effects occur.
func (p *Processor) InboxPost(ctx context.Context, w http.ResponseWriter, r *http.Request) (bool, error) {
return p.federator.FederatingActor().PostInbox(ctx, w, r)
Add table
Reference in a new issue