diff --git a/.golangci.yml b/.golangci.yml
index a7cba8764..75bc787dd 100644
--- a/.golangci.yml
+++ b/.golangci.yml
@@ -81,3 +81,7 @@ linters-settings:
# Logging via Print bypasses our logging framework.
- ^(fmt\.Print(|f|ln)|print|println)
- ^panic.*$
+
+ dupl:
+ # tokens count to trigger issue, 150 by default
+ threshold: 160
diff --git a/activitypub/activitypub.go b/activitypub/activitypub.go
new file mode 100644
index 000000000..0d80a5afe
--- /dev/null
+++ b/activitypub/activitypub.go
@@ -0,0 +1,51 @@
+package activitypub
+
+import (
+ "github.com/owncast/owncast/activitypub/crypto"
+ "github.com/owncast/owncast/activitypub/inbox"
+ "github.com/owncast/owncast/activitypub/outbox"
+ "github.com/owncast/owncast/activitypub/persistence"
+ "github.com/owncast/owncast/activitypub/workerpool"
+
+ "github.com/owncast/owncast/core/data"
+ "github.com/owncast/owncast/models"
+ log "github.com/sirupsen/logrus"
+)
+
+// Start will initialize and start the federation support.
+func Start(datastore *data.Datastore) {
+ persistence.Setup(datastore)
+ workerpool.InitOutboundWorkerPool()
+ inbox.InitInboxWorkerPool()
+ StartRouter()
+
+ // Test
+ if data.GetPrivateKey() == "" {
+ privateKey, publicKey, err := crypto.GenerateKeys()
+ _ = data.SetPrivateKey(string(privateKey))
+ _ = data.SetPublicKey(string(publicKey))
+ if err != nil {
+ log.Errorln("Unable to get private key", err)
+ }
+ }
+}
+
+// SendLive will send a "Go Live" message to followers.
+func SendLive() error {
+ return outbox.SendLive()
+}
+
+// SendPublicFederatedMessage will send an arbitrary provided message to followers.
+func SendPublicFederatedMessage(message string) error {
+ return outbox.SendPublicMessage(message)
+}
+
+// GetFollowerCount will return the local tracked follower count.
+func GetFollowerCount() (int64, error) {
+ return persistence.GetFollowerCount()
+}
+
+// GetPendingFollowRequests will return the pending follow requests.
+func GetPendingFollowRequests() ([]models.Follower, error) {
+ return persistence.GetPendingFollowRequests()
+}
diff --git a/activitypub/apmodels/activity.go b/activitypub/apmodels/activity.go
new file mode 100644
index 000000000..b4c9b5947
--- /dev/null
+++ b/activitypub/apmodels/activity.go
@@ -0,0 +1,88 @@
+package apmodels
+
+import (
+ "net/url"
+ "time"
+
+ "github.com/go-fed/activity/streams"
+ "github.com/go-fed/activity/streams/vocab"
+ "github.com/owncast/owncast/core/data"
+)
+
+// PrivacyAudience represents the audience for an activity.
+type PrivacyAudience = string
+
+const (
+ // PUBLIC is an audience meaning anybody can view the item.
+ PUBLIC PrivacyAudience = "https://www.w3.org/ns/activitystreams#Public"
+)
+
+// MakeCreateActivity will return a new Create activity with the provided ID.
+func MakeCreateActivity(activityID *url.URL) vocab.ActivityStreamsCreate {
+ activity := streams.NewActivityStreamsCreate()
+ id := streams.NewJSONLDIdProperty()
+ id.Set(activityID)
+ activity.SetJSONLDId(id)
+
+ // CC the public if we're not treating ActivityPub as "private".
+ if !data.GetFederationIsPrivate() {
+ public, _ := url.Parse(PUBLIC)
+ to := streams.NewActivityStreamsToProperty()
+ to.AppendIRI(public)
+ activity.SetActivityStreamsTo(to)
+
+ audience := streams.NewActivityStreamsAudienceProperty()
+ audience.AppendIRI(public)
+ activity.SetActivityStreamsAudience(audience)
+ }
+
+ return activity
+}
+
+// MakeUpdateActivity will return a new Update activity with the provided aID.
+func MakeUpdateActivity(activityID *url.URL) vocab.ActivityStreamsUpdate {
+ activity := streams.NewActivityStreamsUpdate()
+ id := streams.NewJSONLDIdProperty()
+ id.Set(activityID)
+ activity.SetJSONLDId(id)
+
+ // CC the public if we're not treating ActivityPub as "private".
+ if !data.GetFederationIsPrivate() {
+ public, _ := url.Parse(PUBLIC)
+ cc := streams.NewActivityStreamsCcProperty()
+ cc.AppendIRI(public)
+ activity.SetActivityStreamsCc(cc)
+ }
+
+ return activity
+}
+
+// MakeNote will return a new Note object.
+func MakeNote(text string, noteIRI *url.URL, attributedToIRI *url.URL) vocab.ActivityStreamsNote {
+ note := streams.NewActivityStreamsNote()
+ content := streams.NewActivityStreamsContentProperty()
+ content.AppendXMLSchemaString(text)
+ note.SetActivityStreamsContent(content)
+ id := streams.NewJSONLDIdProperty()
+ id.Set(noteIRI)
+ note.SetJSONLDId(id)
+
+ published := streams.NewActivityStreamsPublishedProperty()
+ published.Set(time.Now())
+ note.SetActivityStreamsPublished(published)
+
+ attributedTo := attributedToIRI
+ attr := streams.NewActivityStreamsAttributedToProperty()
+ attr.AppendIRI(attributedTo)
+ note.SetActivityStreamsAttributedTo(attr)
+
+ // CC the public if we're not treating ActivityPub as "private".
+ if !data.GetFederationIsPrivate() {
+ public, _ := url.Parse(PUBLIC)
+ cc := streams.NewActivityStreamsCcProperty()
+ cc.AppendIRI(public)
+ note.SetActivityStreamsCc(cc)
+ }
+
+ return note
+}
diff --git a/activitypub/apmodels/actor.go b/activitypub/apmodels/actor.go
new file mode 100644
index 000000000..bdae45073
--- /dev/null
+++ b/activitypub/apmodels/actor.go
@@ -0,0 +1,263 @@
+package apmodels
+
+import (
+ "fmt"
+ "net/url"
+ "time"
+
+ "github.com/go-fed/activity/streams"
+ "github.com/go-fed/activity/streams/vocab"
+ "github.com/owncast/owncast/activitypub/crypto"
+ "github.com/owncast/owncast/core/data"
+ "github.com/owncast/owncast/models"
+ log "github.com/sirupsen/logrus"
+)
+
+// ActivityPubActor represents a single actor in handling ActivityPub activity.
+type ActivityPubActor struct {
+ // ActorIRI is the IRI of the remote actor.
+ ActorIri *url.URL
+ // FollowRequestIRI is the unique identifier of the follow request.
+ FollowRequestIri *url.URL
+ // Inbox is the inbox URL of the remote follower
+ Inbox *url.URL
+ // Name is the display name of the follower.
+ Name string
+ // Username is the account username of the remote actor.
+ Username string
+ // FullUsername is the username@account.tld representation of the user.
+ FullUsername string
+ // Image is the avatar image of the Actor.
+ Image *url.URL
+ // W3IDSecurityV1PublicKey is the public key of the actor.
+ W3IDSecurityV1PublicKey vocab.W3IDSecurityV1PublicKeyProperty
+ // DisabledAt is the time, if any, this follower was blocked/removed.
+ DisabledAt *time.Time
+}
+
+// DeleteRequest represents a request for delete.
+type DeleteRequest struct {
+ ActorIri string
+}
+
+// MakeActorFromPerson takes a full ActivityPub Person and returns our internal
+// representation of an actor.
+func MakeActorFromPerson(person vocab.ActivityStreamsPerson) ActivityPubActor {
+ apActor := ActivityPubActor{
+ ActorIri: person.GetJSONLDId().Get(),
+ Inbox: person.GetActivityStreamsInbox().GetIRI(),
+ Name: person.GetActivityStreamsName().Begin().GetXMLSchemaString(),
+ Username: person.GetActivityStreamsPreferredUsername().GetXMLSchemaString(),
+ FullUsername: GetFullUsernameFromPerson(person),
+ W3IDSecurityV1PublicKey: person.GetW3IDSecurityV1PublicKey(),
+ }
+
+ if person.GetActivityStreamsIcon() != nil && person.GetActivityStreamsIcon().Len() > 0 {
+ apActor.Image = person.GetActivityStreamsIcon().At(0).GetActivityStreamsImage().GetActivityStreamsUrl().Begin().GetIRI()
+ }
+
+ return apActor
+}
+
+// MakeActorFromService takes a full ActivityPub Service and returns our internal
+// representation of an actor.
+func MakeActorFromService(service vocab.ActivityStreamsService) ActivityPubActor {
+ apActor := ActivityPubActor{
+ ActorIri: service.GetJSONLDId().Get(),
+ Inbox: service.GetActivityStreamsInbox().GetIRI(),
+ Name: service.GetActivityStreamsName().Begin().GetXMLSchemaString(),
+ Username: service.GetActivityStreamsPreferredUsername().GetXMLSchemaString(),
+ FullUsername: GetFullUsernameFromService(service),
+ W3IDSecurityV1PublicKey: service.GetW3IDSecurityV1PublicKey(),
+ }
+
+ if service.GetActivityStreamsIcon() != nil && service.GetActivityStreamsIcon().Len() > 0 {
+ apActor.Image = service.GetActivityStreamsIcon().At(0).GetActivityStreamsImage().GetActivityStreamsUrl().Begin().GetIRI()
+ }
+
+ return apActor
+}
+
+// MakeActorPropertyWithID will return an actor property filled with the provided IRI.
+func MakeActorPropertyWithID(idIRI *url.URL) vocab.ActivityStreamsActorProperty {
+ actor := streams.NewActivityStreamsActorProperty()
+ actor.AppendIRI(idIRI)
+ return actor
+}
+
+// MakeServiceForAccount will create a new local actor service with the the provided username.
+func MakeServiceForAccount(accountName string) vocab.ActivityStreamsService {
+ actorIRI := MakeLocalIRIForAccount(accountName)
+
+ person := streams.NewActivityStreamsService()
+ nameProperty := streams.NewActivityStreamsNameProperty()
+ nameProperty.AppendXMLSchemaString(data.GetServerName())
+ person.SetActivityStreamsName(nameProperty)
+
+ preferredUsernameProperty := streams.NewActivityStreamsPreferredUsernameProperty()
+ preferredUsernameProperty.SetXMLSchemaString(accountName)
+ person.SetActivityStreamsPreferredUsername(preferredUsernameProperty)
+
+ inboxIRI := MakeLocalIRIForResource("/user/" + accountName + "/inbox")
+
+ inboxProp := streams.NewActivityStreamsInboxProperty()
+ inboxProp.SetIRI(inboxIRI)
+ person.SetActivityStreamsInbox(inboxProp)
+
+ needsFollowApprovalProperty := streams.NewActivityStreamsManuallyApprovesFollowersProperty()
+ needsFollowApprovalProperty.Set(data.GetFederationIsPrivate())
+ person.SetActivityStreamsManuallyApprovesFollowers(needsFollowApprovalProperty)
+
+ outboxIRI := MakeLocalIRIForResource("/user/" + accountName + "/outbox")
+
+ outboxProp := streams.NewActivityStreamsOutboxProperty()
+ outboxProp.SetIRI(outboxIRI)
+ person.SetActivityStreamsOutbox(outboxProp)
+
+ id := streams.NewJSONLDIdProperty()
+ id.Set(actorIRI)
+ person.SetJSONLDId(id)
+
+ publicKey := crypto.GetPublicKey(actorIRI)
+
+ publicKeyProp := streams.NewW3IDSecurityV1PublicKeyProperty()
+ publicKeyType := streams.NewW3IDSecurityV1PublicKey()
+
+ pubKeyIDProp := streams.NewJSONLDIdProperty()
+ pubKeyIDProp.Set(publicKey.ID)
+
+ publicKeyType.SetJSONLDId(pubKeyIDProp)
+
+ ownerProp := streams.NewW3IDSecurityV1OwnerProperty()
+ ownerProp.SetIRI(publicKey.Owner)
+ publicKeyType.SetW3IDSecurityV1Owner(ownerProp)
+
+ publicKeyPemProp := streams.NewW3IDSecurityV1PublicKeyPemProperty()
+ publicKeyPemProp.Set(publicKey.PublicKeyPem)
+ publicKeyType.SetW3IDSecurityV1PublicKeyPem(publicKeyPemProp)
+ publicKeyProp.AppendW3IDSecurityV1PublicKey(publicKeyType)
+ person.SetW3IDSecurityV1PublicKey(publicKeyProp)
+
+ if t, err := data.GetServerInitTime(); t != nil {
+ publishedDateProp := streams.NewActivityStreamsPublishedProperty()
+ publishedDateProp.Set(t.Time)
+ person.SetActivityStreamsPublished(publishedDateProp)
+ } else {
+ log.Errorln("unable to fetch server init time", err)
+ }
+
+ // Profile properties
+
+ // Avatar
+ userAvatarURLString := data.GetServerURL() + "/logo/external"
+ userAvatarURL, err := url.Parse(userAvatarURLString)
+ if err != nil {
+ log.Errorln("unable to parse user avatar url", userAvatarURLString, err)
+ }
+
+ image := streams.NewActivityStreamsImage()
+ imgProp := streams.NewActivityStreamsUrlProperty()
+ imgProp.AppendIRI(userAvatarURL)
+ image.SetActivityStreamsUrl(imgProp)
+ icon := streams.NewActivityStreamsIconProperty()
+ icon.AppendActivityStreamsImage(image)
+ person.SetActivityStreamsIcon(icon)
+
+ // Actor URL
+ urlProperty := streams.NewActivityStreamsUrlProperty()
+ urlProperty.AppendIRI(actorIRI)
+ person.SetActivityStreamsUrl(urlProperty)
+
+ // Profile header
+ headerImage := streams.NewActivityStreamsImage()
+ headerImgPropURL := streams.NewActivityStreamsUrlProperty()
+ headerImgPropURL.AppendIRI(userAvatarURL)
+ headerImage.SetActivityStreamsUrl(headerImgPropURL)
+ headerImageProp := streams.NewActivityStreamsImageProperty()
+ headerImageProp.AppendActivityStreamsImage(headerImage)
+ person.SetActivityStreamsImage(headerImageProp)
+
+ // Profile bio
+ summaryProperty := streams.NewActivityStreamsSummaryProperty()
+ summaryProperty.AppendXMLSchemaString(data.GetServerSummary())
+ person.SetActivityStreamsSummary(summaryProperty)
+
+ // Links
+ for _, link := range data.GetSocialHandles() {
+ addMetadataLinkToProfile(person, link.Platform, link.URL)
+ }
+
+ // Discoverable
+ discoverableProperty := streams.NewTootDiscoverableProperty()
+ discoverableProperty.Set(true)
+ person.SetTootDiscoverable(discoverableProperty)
+
+ // Followers
+ followersProperty := streams.NewActivityStreamsFollowersProperty()
+ followersURL := *actorIRI
+ followersURL.Path = actorIRI.Path + "/followers"
+ followersProperty.SetIRI(&followersURL)
+ person.SetActivityStreamsFollowers(followersProperty)
+
+ // Tags
+ tagProp := streams.NewActivityStreamsTagProperty()
+ for _, tagString := range data.GetServerMetadataTags() {
+ hashtag := MakeHashtag(tagString)
+ tagProp.AppendTootHashtag(hashtag)
+ }
+
+ person.SetActivityStreamsTag(tagProp)
+
+ // Work around an issue where a single attachment will not serialize
+ // as an array, so add another item to the mix.
+ if len(data.GetSocialHandles()) == 1 {
+ addMetadataLinkToProfile(person, "Owncast", "https://owncast.online")
+ }
+
+ return person
+}
+
+// GetFullUsernameFromPerson will return the user@host.tld formatted user given a person object.
+func GetFullUsernameFromPerson(person vocab.ActivityStreamsPerson) string {
+ hostname := person.GetJSONLDId().GetIRI().Hostname()
+ username := person.GetActivityStreamsPreferredUsername().GetXMLSchemaString()
+ fullUsername := fmt.Sprintf("%s@%s", username, hostname)
+
+ return fullUsername
+}
+
+// GetFullUsernameFromService will return the user@host.tld formatted user given a service object.
+func GetFullUsernameFromService(person vocab.ActivityStreamsService) string {
+ hostname := person.GetJSONLDId().GetIRI().Hostname()
+ username := person.GetActivityStreamsPreferredUsername().GetXMLSchemaString()
+ fullUsername := fmt.Sprintf("%s@%s", username, hostname)
+
+ return fullUsername
+}
+
+func addMetadataLinkToProfile(profile vocab.ActivityStreamsService, name string, url string) {
+ attachments := profile.GetActivityStreamsAttachment()
+ if attachments == nil {
+ attachments = streams.NewActivityStreamsAttachmentProperty()
+ }
+
+ displayName := name
+ socialHandle := models.GetSocialHandle(name)
+ if socialHandle != nil {
+ displayName = socialHandle.Platform
+ }
+
+ linkValue := fmt.Sprintf("%s", url, url)
+
+ attachment := streams.NewActivityStreamsObject()
+ attachmentProp := streams.NewJSONLDTypeProperty()
+ attachmentProp.AppendXMLSchemaString("PropertyValue")
+ attachment.SetJSONLDType(attachmentProp)
+ attachmentName := streams.NewActivityStreamsNameProperty()
+ attachmentName.AppendXMLSchemaString(displayName)
+ attachment.SetActivityStreamsName(attachmentName)
+ attachment.GetUnknownProperties()["value"] = linkValue
+
+ attachments.AppendActivityStreamsObject(attachment)
+ profile.SetActivityStreamsAttachment(attachments)
+}
diff --git a/activitypub/apmodels/actor_test.go b/activitypub/apmodels/actor_test.go
new file mode 100644
index 000000000..fb52f3e62
--- /dev/null
+++ b/activitypub/apmodels/actor_test.go
@@ -0,0 +1,168 @@
+package apmodels
+
+import (
+ "io/ioutil"
+ "net/url"
+ "os"
+ "testing"
+
+ "github.com/go-fed/activity/streams"
+ "github.com/go-fed/activity/streams/vocab"
+ "github.com/owncast/owncast/core/data"
+)
+
+func makeFakeService() vocab.ActivityStreamsService {
+ iri, _ := url.Parse("https://fake.fediverse.server/user/mrfoo")
+ name := "Mr Foo"
+ username := "foodawg"
+ inbox, _ := url.Parse("https://fake.fediverse.server/user/mrfoo/inbox")
+ userAvatarURL, _ := url.Parse("https://fake.fediverse.server/user/mrfoo/avatar.png")
+
+ service := streams.NewActivityStreamsService()
+
+ id := streams.NewJSONLDIdProperty()
+ id.Set(iri)
+ service.SetJSONLDId(id)
+
+ nameProperty := streams.NewActivityStreamsNameProperty()
+ nameProperty.AppendXMLSchemaString(name)
+ service.SetActivityStreamsName(nameProperty)
+
+ preferredUsernameProperty := streams.NewActivityStreamsPreferredUsernameProperty()
+ preferredUsernameProperty.SetXMLSchemaString(username)
+ service.SetActivityStreamsPreferredUsername(preferredUsernameProperty)
+
+ inboxProp := streams.NewActivityStreamsInboxProperty()
+ inboxProp.SetIRI(inbox)
+ service.SetActivityStreamsInbox(inboxProp)
+
+ image := streams.NewActivityStreamsImage()
+ imgProp := streams.NewActivityStreamsUrlProperty()
+ imgProp.AppendIRI(userAvatarURL)
+ image.SetActivityStreamsUrl(imgProp)
+ icon := streams.NewActivityStreamsIconProperty()
+ icon.AppendActivityStreamsImage(image)
+ service.SetActivityStreamsIcon(icon)
+
+ return service
+}
+
+func TestMain(m *testing.M) {
+ dbFile, err := ioutil.TempFile(os.TempDir(), "owncast-test-db.db")
+ if err != nil {
+ panic(err)
+ }
+
+ data.SetupPersistence(dbFile.Name())
+ data.SetServerURL("https://my.cool.site.biz")
+
+ m.Run()
+}
+
+func TestMakeActorFromService(t *testing.T) {
+ service := makeFakeService()
+ actor := MakeActorFromService(service)
+
+ if actor.ActorIri != service.GetJSONLDId().GetIRI() {
+ t.Errorf("actor.ID = %v, want %v", actor.ActorIri, service.GetJSONLDId().GetIRI())
+ }
+
+ if actor.Name != service.GetActivityStreamsName().At(0).GetXMLSchemaString() {
+ t.Errorf("actor.Name = %v, want %v", actor.Name, service.GetActivityStreamsName().At(0).GetXMLSchemaString())
+ }
+
+ if actor.Username != service.GetActivityStreamsPreferredUsername().GetXMLSchemaString() {
+ t.Errorf("actor.Username = %v, want %v", actor.Username, service.GetActivityStreamsPreferredUsername().GetXMLSchemaString())
+ }
+
+ if actor.Inbox != service.GetActivityStreamsInbox().GetIRI() {
+ t.Errorf("actor.Inbox = %v, want %v", actor.Inbox.String(), service.GetActivityStreamsInbox().GetIRI())
+ }
+
+ if actor.Image != service.GetActivityStreamsIcon().At(0).GetActivityStreamsImage().GetActivityStreamsUrl().At(0).GetIRI() {
+ t.Errorf("actor.Image = %v, want %v", actor.Image, service.GetActivityStreamsIcon().At(0).GetActivityStreamsImage().GetActivityStreamsUrl().At(0).GetIRI())
+ }
+}
+
+func TestMakeActorPropertyWithID(t *testing.T) {
+ iri, _ := url.Parse("https://fake.fediverse.server/user/mrfoo")
+ actor := MakeActorPropertyWithID(iri)
+
+ if actor.Begin().GetIRI() != iri {
+ t.Errorf("actor.IRI = %v, want %v", actor.Begin().GetIRI(), iri)
+ }
+}
+
+func TestGetFullUsernameFromPerson(t *testing.T) {
+ expected := "foodawg@fake.fediverse.server"
+ person := makeFakeService()
+ username := GetFullUsernameFromService(person)
+
+ if username != expected {
+ t.Errorf("actor.Username = %v, want %v", username, expected)
+ }
+}
+
+func TestAddMetadataLinkToProfile(t *testing.T) {
+ person := makeFakeService()
+ addMetadataLinkToProfile(person, "my site", "https://my.cool.site.biz")
+ attchment := person.GetActivityStreamsAttachment().At(0)
+
+ nameValue := attchment.GetActivityStreamsObject().GetActivityStreamsName().At(0).GetXMLSchemaString()
+ expected := "my site"
+ if nameValue != expected {
+ t.Errorf("attachment name = %v, want %v", nameValue, expected)
+ }
+
+ propertyValue := attchment.GetActivityStreamsObject().GetUnknownProperties()["value"]
+ expected = `https://my.cool.site.biz`
+ if propertyValue != expected {
+ t.Errorf("attachment value = %v, want %v", propertyValue, expected)
+ }
+}
+
+func TestMakeServiceForAccount(t *testing.T) {
+ person := MakeServiceForAccount("accountname")
+ expectedIRI := "https://my.cool.site.biz/federation/user/accountname"
+ if person.GetJSONLDId().Get().String() != expectedIRI {
+ t.Errorf("actor.IRI = %v, want %v", person.GetJSONLDId().Get().String(), expectedIRI)
+ }
+
+ if person.GetActivityStreamsPreferredUsername().GetXMLSchemaString() != "accountname" {
+ t.Errorf("actor.PreferredUsername = %v, want %v", person.GetActivityStreamsPreferredUsername().GetXMLSchemaString(), expectedIRI)
+ }
+
+ expectedInbox := "https://my.cool.site.biz/federation/user/accountname/inbox"
+ if person.GetActivityStreamsInbox().GetIRI().String() != expectedInbox {
+ t.Errorf("actor.Inbox = %v, want %v", person.GetActivityStreamsInbox().GetIRI().String(), expectedInbox)
+ }
+
+ expectedOutbox := "https://my.cool.site.biz/federation/user/accountname/outbox"
+ if person.GetActivityStreamsOutbox().GetIRI().String() != expectedOutbox {
+ t.Errorf("actor.Outbox = %v, want %v", person.GetActivityStreamsOutbox().GetIRI().String(), expectedOutbox)
+ }
+
+ expectedFollowers := "https://my.cool.site.biz/federation/user/accountname/followers"
+ if person.GetActivityStreamsFollowers().GetIRI().String() != expectedFollowers {
+ t.Errorf("actor.Followers = %v, want %v", person.GetActivityStreamsFollowers().GetIRI().String(), expectedFollowers)
+ }
+
+ expectedName := "Owncast"
+ if person.GetActivityStreamsName().Begin().GetXMLSchemaString() != expectedName {
+ t.Errorf("actor.Name = %v, want %v", person.GetActivityStreamsName().Begin().GetXMLSchemaString(), expectedName)
+ }
+
+ expectedAvatar := "https://my.cool.site.biz/logo/external"
+ if person.GetActivityStreamsIcon().At(0).GetActivityStreamsImage().GetActivityStreamsUrl().Begin().GetIRI().String() != expectedAvatar {
+ t.Errorf("actor.Avatar = %v, want %v", person.GetActivityStreamsIcon().At(0).GetActivityStreamsImage().GetActivityStreamsUrl().Begin().GetIRI().String(), expectedAvatar)
+ }
+
+ expectedSummary := "Welcome to your new Owncast server! This description can be changed in the admin. Visit https://owncast.online/docs/configuration/ to learn more."
+ if person.GetActivityStreamsSummary().At(0).GetXMLSchemaString() != expectedSummary {
+ t.Errorf("actor.Summary = %v, want %v", person.GetActivityStreamsSummary().At(0).GetXMLSchemaString(), expectedSummary)
+ }
+
+ if person.GetActivityStreamsUrl().At(0).GetIRI().String() != expectedIRI {
+ t.Errorf("actor.URL = %v, want %v", person.GetActivityStreamsUrl().At(0).GetIRI().String(), expectedIRI)
+ }
+}
diff --git a/activitypub/apmodels/hashtag.go b/activitypub/apmodels/hashtag.go
new file mode 100644
index 000000000..b7aca1d44
--- /dev/null
+++ b/activitypub/apmodels/hashtag.go
@@ -0,0 +1,24 @@
+package apmodels
+
+import (
+ "net/url"
+
+ "github.com/go-fed/activity/streams"
+ "github.com/go-fed/activity/streams/vocab"
+)
+
+// MakeHashtag will create and return a mastodon toot hashtag object with the provided name.
+func MakeHashtag(name string) vocab.TootHashtag {
+ u, _ := url.Parse("https://directory.owncast.online/tags/" + name)
+
+ hashtag := streams.NewTootHashtag()
+ hashtagName := streams.NewActivityStreamsNameProperty()
+ hashtagName.AppendXMLSchemaString("#" + name)
+ hashtag.SetActivityStreamsName(hashtagName)
+
+ hashtagHref := streams.NewActivityStreamsHrefProperty()
+ hashtagHref.Set(u)
+ hashtag.SetActivityStreamsHref(hashtagHref)
+
+ return hashtag
+}
diff --git a/activitypub/apmodels/inboxRequest.go b/activitypub/apmodels/inboxRequest.go
new file mode 100644
index 000000000..4a0c9f36b
--- /dev/null
+++ b/activitypub/apmodels/inboxRequest.go
@@ -0,0 +1,10 @@
+package apmodels
+
+import "net/http"
+
+// InboxRequest represents an inbound request to the ActivityPub inbox.
+type InboxRequest struct {
+ Request *http.Request
+ ForLocalAccount string
+ Body []byte
+}
diff --git a/activitypub/apmodels/message.go b/activitypub/apmodels/message.go
new file mode 100644
index 000000000..67d124982
--- /dev/null
+++ b/activitypub/apmodels/message.go
@@ -0,0 +1,50 @@
+package apmodels
+
+import (
+ "net/url"
+
+ "github.com/go-fed/activity/streams"
+ "github.com/go-fed/activity/streams/vocab"
+)
+
+// CreateCreateActivity will create a new Create Activity model with the provided ID and IRI.
+func CreateCreateActivity(id string, localAccountIRI *url.URL) vocab.ActivityStreamsCreate {
+ objectID := MakeLocalIRIForResource(id)
+ message := MakeCreateActivity(objectID)
+
+ actorProp := streams.NewActivityStreamsActorProperty()
+ actorProp.AppendIRI(localAccountIRI)
+ message.SetActivityStreamsActor(actorProp)
+
+ return message
+}
+
+// AddImageAttachmentToNote will add the provided image URL to the provided note object.
+func AddImageAttachmentToNote(note vocab.ActivityStreamsNote, image string) {
+ imageURL, err := url.Parse(image)
+ if err != nil {
+ return
+ }
+
+ attachments := note.GetActivityStreamsAttachment()
+ if attachments == nil {
+ attachments = streams.NewActivityStreamsAttachmentProperty()
+ }
+
+ urlProp := streams.NewActivityStreamsUrlProperty()
+ urlProp.AppendIRI(imageURL)
+
+ apImage := streams.NewActivityStreamsImage()
+ apImage.SetActivityStreamsUrl(urlProp)
+
+ imageProp := streams.NewActivityStreamsImageProperty()
+ imageProp.AppendActivityStreamsImage(apImage)
+
+ imageDescription := streams.NewActivityStreamsContentProperty()
+ imageDescription.AppendXMLSchemaString("Live stream preview")
+ apImage.SetActivityStreamsContent(imageDescription)
+
+ attachments.AppendActivityStreamsImage(apImage)
+
+ note.SetActivityStreamsAttachment(attachments)
+}
diff --git a/activitypub/apmodels/utils.go b/activitypub/apmodels/utils.go
new file mode 100644
index 000000000..056d527b0
--- /dev/null
+++ b/activitypub/apmodels/utils.go
@@ -0,0 +1,62 @@
+package apmodels
+
+import (
+ "encoding/json"
+ "net/url"
+ "path"
+
+ "github.com/go-fed/activity/streams"
+ "github.com/go-fed/activity/streams/vocab"
+ "github.com/owncast/owncast/core/data"
+ log "github.com/sirupsen/logrus"
+)
+
+// MakeRemoteIRIForResource will create an IRI for a remote location.
+func MakeRemoteIRIForResource(resourcePath string, host string) (*url.URL, error) {
+ generatedURL := "https://" + host
+ u, err := url.Parse(generatedURL)
+ if err != nil {
+ return nil, err
+ }
+
+ u.Path = path.Join(u.Path, "federation", resourcePath)
+
+ return u, nil
+}
+
+// MakeLocalIRIForResource will create an IRI for the local server.
+func MakeLocalIRIForResource(resourcePath string) *url.URL {
+ host := data.GetServerURL()
+ u, err := url.Parse(host)
+ if err != nil {
+ log.Errorln("unable to parse local IRI url", host, err)
+ return nil
+ }
+
+ u.Path = path.Join(u.Path, "federation", resourcePath)
+
+ return u
+}
+
+// MakeLocalIRIForAccount will return a full IRI for the local server account username.
+func MakeLocalIRIForAccount(account string) *url.URL {
+ host := data.GetServerURL()
+ u, err := url.Parse(host)
+ if err != nil {
+ log.Errorln("unable to parse local IRI account server url", err)
+ return nil
+ }
+
+ u.Path = path.Join(u.Path, "federation", "user", account)
+
+ return u
+}
+
+// Serialize will serialize an ActivityPub object to a byte slice.
+func Serialize(obj vocab.Type) ([]byte, error) {
+ var jsonmap map[string]interface{}
+ jsonmap, _ = streams.Serialize(obj)
+ b, err := json.Marshal(jsonmap)
+
+ return b, err
+}
diff --git a/activitypub/apmodels/webfinger.go b/activitypub/apmodels/webfinger.go
new file mode 100644
index 000000000..c24199dbb
--- /dev/null
+++ b/activitypub/apmodels/webfinger.go
@@ -0,0 +1,43 @@
+package apmodels
+
+import (
+ "fmt"
+)
+
+// WebfingerResponse represents a Webfinger response.
+type WebfingerResponse struct {
+ Aliases []string `json:"aliases"`
+ Subject string `json:"subject"`
+ Links []Link `json:"links"`
+}
+
+// Link represents a Webfinger response Link entity.
+type Link struct {
+ Rel string `json:"rel"`
+ Type string `json:"type"`
+ Href string `json:"href"`
+}
+
+// MakeWebfingerResponse will create a new Webfinger response.
+func MakeWebfingerResponse(account string, inbox string, host string) WebfingerResponse {
+ accountIRI := MakeLocalIRIForAccount(account)
+
+ return WebfingerResponse{
+ Subject: fmt.Sprintf("acct:%s@%s", account, host),
+ Aliases: []string{
+ accountIRI.String(),
+ },
+ Links: []Link{
+ {
+ Rel: "self",
+ Type: "application/activity+json",
+ Href: accountIRI.String(),
+ },
+ {
+ Rel: "http://webfinger.net/rel/profile-page",
+ Type: "text/html",
+ Href: accountIRI.String(),
+ },
+ },
+ }
+}
diff --git a/activitypub/controllers/actors.go b/activitypub/controllers/actors.go
new file mode 100644
index 000000000..c3d758569
--- /dev/null
+++ b/activitypub/controllers/actors.go
@@ -0,0 +1,58 @@
+package controllers
+
+import (
+ "net/http"
+ "strings"
+
+ log "github.com/sirupsen/logrus"
+
+ "github.com/owncast/owncast/activitypub/apmodels"
+ "github.com/owncast/owncast/activitypub/crypto"
+ "github.com/owncast/owncast/activitypub/requests"
+ "github.com/owncast/owncast/core/data"
+)
+
+// ActorHandler handles requests for a single actor.
+func ActorHandler(w http.ResponseWriter, r *http.Request) {
+ if !data.GetFederationEnabled() {
+ w.WriteHeader(http.StatusMethodNotAllowed)
+ return
+ }
+
+ pathComponents := strings.Split(r.URL.Path, "/")
+ accountName := pathComponents[3]
+
+ if _, valid := data.GetFederatedInboxMap()[accountName]; !valid {
+ // User is not valid
+ w.WriteHeader(http.StatusNotFound)
+ return
+ }
+
+ // If this request is for an actor's inbox then pass
+ // the request to the inbox controller.
+ if len(pathComponents) == 5 && pathComponents[4] == "inbox" {
+ InboxHandler(w, r)
+ return
+ } else if len(pathComponents) == 5 && pathComponents[4] == "outbox" {
+ OutboxHandler(w, r)
+ return
+ } else if len(pathComponents) == 5 && pathComponents[4] == "followers" {
+ // followers list
+ FollowersHandler(w, r)
+ return
+ } else if len(pathComponents) == 5 && pathComponents[4] == "following" {
+ // following list (none)
+ w.WriteHeader(http.StatusNotFound)
+ return
+ }
+
+ actorIRI := apmodels.MakeLocalIRIForAccount(accountName)
+ publicKey := crypto.GetPublicKey(actorIRI)
+ person := apmodels.MakeServiceForAccount(accountName)
+
+ if err := requests.WriteStreamResponse(person, w, publicKey); err != nil {
+ log.Errorln("unable to write stream response for actor handler", err)
+ w.WriteHeader(http.StatusInternalServerError)
+ return
+ }
+}
diff --git a/activitypub/controllers/followers.go b/activitypub/controllers/followers.go
new file mode 100644
index 000000000..22e60c6dc
--- /dev/null
+++ b/activitypub/controllers/followers.go
@@ -0,0 +1,166 @@
+package controllers
+
+import (
+ "fmt"
+ "net/http"
+ "net/url"
+ "strconv"
+ "strings"
+
+ "github.com/pkg/errors"
+ log "github.com/sirupsen/logrus"
+
+ "github.com/go-fed/activity/streams"
+ "github.com/go-fed/activity/streams/vocab"
+ "github.com/owncast/owncast/activitypub/apmodels"
+ "github.com/owncast/owncast/activitypub/crypto"
+ "github.com/owncast/owncast/activitypub/persistence"
+ "github.com/owncast/owncast/activitypub/requests"
+ "github.com/owncast/owncast/core/data"
+)
+
+const (
+ followersPageSize = 50
+)
+
+// FollowersHandler will return the list of remote followers on the Fediverse.
+func FollowersHandler(w http.ResponseWriter, r *http.Request) {
+ if r.Method != "GET" {
+ w.WriteHeader(http.StatusMethodNotAllowed)
+ return
+ }
+
+ var response interface{}
+ var err error
+ if r.URL.Query().Get("page") != "" {
+ response, err = getFollowersPage(r.URL.Query().Get("page"), r)
+ } else {
+ response, err = getInitialFollowersRequest(r)
+ }
+
+ if response == nil {
+ w.WriteHeader(http.StatusInternalServerError)
+ return
+ }
+
+ if err != nil {
+ _, _ = w.Write([]byte(err.Error()))
+ w.WriteHeader(http.StatusInternalServerError)
+ return
+ }
+
+ pathComponents := strings.Split(r.URL.Path, "/")
+ accountName := pathComponents[3]
+ actorIRI := apmodels.MakeLocalIRIForAccount(accountName)
+ publicKey := crypto.GetPublicKey(actorIRI)
+
+ if err := requests.WriteStreamResponse(response.(vocab.Type), w, publicKey); err != nil {
+ log.Errorln("unable to write stream response for followers handler", err)
+ }
+}
+
+func getInitialFollowersRequest(r *http.Request) (vocab.ActivityStreamsOrderedCollection, error) {
+ followerCount, _ := persistence.GetFollowerCount()
+ collection := streams.NewActivityStreamsOrderedCollection()
+ idProperty := streams.NewJSONLDIdProperty()
+ id, err := createPageURL(r, nil)
+ if err != nil {
+ return nil, errors.Wrap(err, "unable to create followers page property")
+ }
+ idProperty.SetIRI(id)
+ collection.SetJSONLDId(idProperty)
+
+ totalItemsProperty := streams.NewActivityStreamsTotalItemsProperty()
+ totalItemsProperty.Set(int(followerCount))
+ collection.SetActivityStreamsTotalItems(totalItemsProperty)
+
+ first := streams.NewActivityStreamsFirstProperty()
+ page := "1"
+ firstIRI, err := createPageURL(r, &page)
+ if err != nil {
+ return nil, errors.Wrap(err, "unable to create first page property")
+ }
+
+ first.SetIRI(firstIRI)
+ collection.SetActivityStreamsFirst(first)
+
+ return collection, nil
+}
+
+func getFollowersPage(page string, r *http.Request) (vocab.ActivityStreamsOrderedCollectionPage, error) {
+ pageInt, err := strconv.Atoi(page)
+ if err != nil {
+ return nil, errors.Wrap(err, "unable to parse page number")
+ }
+
+ followerCount, err := persistence.GetFollowerCount()
+ if err != nil {
+ return nil, errors.Wrap(err, "unable to get follower count")
+ }
+
+ followers, err := persistence.GetFederationFollowers(followersPageSize, (pageInt-1)*followersPageSize)
+ if err != nil {
+ return nil, errors.Wrap(err, "unable to get federation followers")
+ }
+
+ collectionPage := streams.NewActivityStreamsOrderedCollectionPage()
+ idProperty := streams.NewJSONLDIdProperty()
+ id, err := createPageURL(r, &page)
+ if err != nil {
+ return nil, errors.Wrap(err, "unable to create followers page ID")
+ }
+ idProperty.SetIRI(id)
+ collectionPage.SetJSONLDId(idProperty)
+
+ orderedItems := streams.NewActivityStreamsOrderedItemsProperty()
+
+ for _, follower := range followers {
+ u, _ := url.Parse(follower.ActorIRI)
+ orderedItems.AppendIRI(u)
+ }
+ collectionPage.SetActivityStreamsOrderedItems(orderedItems)
+
+ partOf := streams.NewActivityStreamsPartOfProperty()
+ partOfIRI, err := createPageURL(r, nil)
+ if err != nil {
+ return nil, errors.Wrap(err, "unable to create partOf property for followers page")
+ }
+
+ partOf.SetIRI(partOfIRI)
+ collectionPage.SetActivityStreamsPartOf(partOf)
+
+ if pageInt*followersPageSize < int(followerCount) {
+ next := streams.NewActivityStreamsNextProperty()
+ nextPage := fmt.Sprintf("%d", pageInt+1)
+ nextIRI, err := createPageURL(r, &nextPage)
+ if err != nil {
+ return nil, errors.Wrap(err, "unable to create next page property")
+ }
+
+ next.SetIRI(nextIRI)
+ collectionPage.SetActivityStreamsNext(next)
+ }
+
+ return collectionPage, nil
+}
+
+func createPageURL(r *http.Request, page *string) (*url.URL, error) {
+ domain := data.GetServerURL()
+ if domain == "" {
+ return nil, errors.New("unable to get server URL")
+ }
+
+ pageURL, err := url.Parse(domain)
+ if err != nil {
+ return nil, errors.Wrap(err, "unable to parse server URL")
+ }
+
+ if page != nil {
+ query := pageURL.Query()
+ query.Add("page", *page)
+ pageURL.RawQuery = query.Encode()
+ }
+ pageURL.Path = r.URL.Path
+
+ return pageURL, nil
+}
diff --git a/activitypub/controllers/inbox.go b/activitypub/controllers/inbox.go
new file mode 100644
index 000000000..29955d47d
--- /dev/null
+++ b/activitypub/controllers/inbox.go
@@ -0,0 +1,56 @@
+package controllers
+
+import (
+ "io"
+ "net/http"
+ "strings"
+
+ "github.com/owncast/owncast/activitypub/apmodels"
+ "github.com/owncast/owncast/activitypub/inbox"
+ "github.com/owncast/owncast/core/data"
+
+ log "github.com/sirupsen/logrus"
+)
+
+// InboxHandler handles inbound federated requests.
+func InboxHandler(w http.ResponseWriter, r *http.Request) {
+ if r.Method == http.MethodPost {
+ acceptInboxRequest(w, r)
+ } else {
+ w.WriteHeader(http.StatusMethodNotAllowed)
+ }
+}
+
+func acceptInboxRequest(w http.ResponseWriter, r *http.Request) {
+ if !data.GetFederationEnabled() {
+ w.WriteHeader(http.StatusMethodNotAllowed)
+ return
+ }
+
+ urlPathComponents := strings.Split(r.URL.Path, "/")
+ var forLocalAccount string
+ if len(urlPathComponents) == 5 {
+ forLocalAccount = urlPathComponents[3]
+ } else {
+ log.Errorln("Unable to determine username from url path")
+ w.WriteHeader(http.StatusNotFound)
+ return
+ }
+
+ // The account this request is for must match the account name we have set
+ // for federation.
+ if forLocalAccount != data.GetFederationUsername() {
+ w.WriteHeader(http.StatusNotFound)
+ return
+ }
+
+ data, err := io.ReadAll(r.Body)
+ if err != nil {
+ log.Errorln("Unable to read inbox request payload", err)
+ return
+ }
+
+ inboxRequest := apmodels.InboxRequest{Request: r, ForLocalAccount: forLocalAccount, Body: data}
+ inbox.AddToQueue(inboxRequest)
+ w.WriteHeader(http.StatusAccepted)
+}
diff --git a/activitypub/controllers/nodeinfo.go b/activitypub/controllers/nodeinfo.go
new file mode 100644
index 000000000..22c1fa6e3
--- /dev/null
+++ b/activitypub/controllers/nodeinfo.go
@@ -0,0 +1,285 @@
+package controllers
+
+import (
+ "fmt"
+ "net/http"
+ "net/url"
+
+ "github.com/owncast/owncast/activitypub/apmodels"
+ "github.com/owncast/owncast/activitypub/crypto"
+ "github.com/owncast/owncast/activitypub/persistence"
+ "github.com/owncast/owncast/activitypub/requests"
+ "github.com/owncast/owncast/config"
+ "github.com/owncast/owncast/core/data"
+ log "github.com/sirupsen/logrus"
+)
+
+// NodeInfoController returns the V1 node info response.
+func NodeInfoController(w http.ResponseWriter, r *http.Request) {
+ type links struct {
+ Rel string `json:"rel"`
+ Href string `json:"href"`
+ }
+
+ type response struct {
+ Links []links `json:"links"`
+ }
+
+ if !data.GetFederationEnabled() {
+ w.WriteHeader(http.StatusMethodNotAllowed)
+ return
+ }
+
+ serverURL := data.GetServerURL()
+ if serverURL == "" {
+ w.WriteHeader(http.StatusNotFound)
+ return
+ }
+
+ v2, err := url.Parse(serverURL)
+ if err != nil {
+ w.WriteHeader(http.StatusInternalServerError)
+ return
+ }
+
+ v2.Path = "nodeinfo/2.0"
+
+ res := response{
+ Links: []links{
+ {
+ Rel: "http://nodeinfo.diaspora.software/ns/schema/2.0",
+ Href: v2.String(),
+ },
+ },
+ }
+ if err := writeResponse(res, w); err != nil {
+ log.Errorln(err)
+ }
+}
+
+// NodeInfoV2Controller returns the V2 node info response.
+func NodeInfoV2Controller(w http.ResponseWriter, r *http.Request) {
+ type software struct {
+ Name string `json:"name"`
+ Version string `json:"version"`
+ }
+ type users struct {
+ Total int `json:"total"`
+ ActiveMonth int `json:"activeMonth"`
+ ActiveHalfyear int `json:"activeHalfyear"`
+ }
+ type usage struct {
+ Users users `json:"users"`
+ LocalPosts int `json:"localPosts"`
+ }
+ type response struct {
+ Version string `json:"version"`
+ Software software `json:"software"`
+ Protocols []string `json:"protocols"`
+ Usage usage `json:"usage"`
+ OpenRegistrations bool `json:"openRegistrations"`
+ }
+
+ if !data.GetFederationEnabled() {
+ w.WriteHeader(http.StatusMethodNotAllowed)
+ return
+ }
+
+ localPostCount, _ := persistence.GetLocalPostCount()
+
+ res := response{
+ Version: "2.0",
+ Software: software{
+ Name: "Owncast",
+ Version: config.VersionNumber,
+ },
+ Usage: usage{
+ Users: users{
+ Total: 1,
+ ActiveMonth: 1,
+ ActiveHalfyear: 1,
+ },
+ LocalPosts: int(localPostCount),
+ },
+ OpenRegistrations: false,
+ Protocols: []string{"activitypub"},
+ }
+
+ if err := writeResponse(res, w); err != nil {
+ log.Errorln(err)
+ }
+}
+
+// XNodeInfo2Controller returns the x-nodeinfo2.
+func XNodeInfo2Controller(w http.ResponseWriter, r *http.Request) {
+ type Organization struct {
+ Name string `json:"name"`
+ Contact string `json:"contact"`
+ }
+ type Server struct {
+ BaseURL string `json:"baseUrl"`
+ Version string `json:"version"`
+ Name string `json:"name"`
+ Software string `json:"software"`
+ }
+ type Services struct {
+ Outbound []string `json:"outbound"`
+ Inbound []string `json:"inbound"`
+ }
+ type Users struct {
+ ActiveWeek int `json:"activeWeek"`
+ Total int `json:"total"`
+ ActiveMonth int `json:"activeMonth"`
+ ActiveHalfyear int `json:"activeHalfyear"`
+ }
+ type Usage struct {
+ Users Users `json:"users"`
+ LocalPosts int `json:"localPosts"`
+ LocalComments int `json:"localComments"`
+ }
+ type response struct {
+ Organization Organization `json:"organization"`
+ Server Server `json:"server"`
+ Services Services `json:"services"`
+ Protocols []string `json:"protocols"`
+ Version string `json:"version"`
+ OpenRegistrations bool `json:"openRegistrations"`
+ Usage Usage `json:"usage"`
+ }
+
+ if !data.GetFederationEnabled() {
+ w.WriteHeader(http.StatusMethodNotAllowed)
+ return
+ }
+
+ serverURL := data.GetServerURL()
+ if serverURL == "" {
+ w.WriteHeader(http.StatusNotFound)
+ return
+ }
+
+ localPostCount, _ := persistence.GetLocalPostCount()
+
+ res := &response{
+ Organization: Organization{
+ Name: data.GetServerName(),
+ Contact: serverURL,
+ },
+ Server: Server{
+ BaseURL: serverURL,
+ Version: config.VersionNumber,
+ Name: "owncast",
+ Software: "owncast",
+ },
+ Services: Services{
+ Inbound: []string{"activitypub"},
+ Outbound: []string{"activitypub"},
+ },
+ Protocols: []string{"activitypub"},
+ Version: config.VersionNumber,
+ Usage: Usage{
+ Users: Users{
+ ActiveWeek: 1,
+ Total: 1,
+ ActiveMonth: 1,
+ ActiveHalfyear: 1,
+ },
+
+ LocalPosts: int(localPostCount),
+ LocalComments: 0,
+ },
+ }
+
+ if err := writeResponse(res, w); err != nil {
+ log.Errorln(err)
+ }
+}
+
+// InstanceV1Controller returns the v1 instance details.
+func InstanceV1Controller(w http.ResponseWriter, r *http.Request) {
+ type Stats struct {
+ UserCount int `json:"user_count"`
+ StatusCount int `json:"status_count"`
+ DomainCount int `json:"domain_count"`
+ }
+ type response struct {
+ URI string `json:"uri"`
+ Title string `json:"title"`
+ ShortDescription string `json:"short_description"`
+ Description string `json:"description"`
+ Version string `json:"version"`
+ Stats Stats `json:"stats"`
+ Thumbnail string `json:"thumbnail"`
+ Languages []string `json:"languages"`
+ Registrations bool `json:"registrations"`
+ ApprovalRequired bool `json:"approval_required"`
+ InvitesEnabled bool `json:"invites_enabled"`
+ }
+
+ if !data.GetFederationEnabled() {
+ w.WriteHeader(http.StatusMethodNotAllowed)
+ return
+ }
+
+ serverURL := data.GetServerURL()
+ if serverURL == "" {
+ w.WriteHeader(http.StatusNotFound)
+ return
+ }
+
+ thumbnail, err := url.Parse(serverURL)
+ if err != nil {
+ w.WriteHeader(http.StatusInternalServerError)
+ return
+ }
+
+ thumbnail.Path = "/logo/external"
+ localPostCount, _ := persistence.GetLocalPostCount()
+
+ res := response{
+ URI: serverURL,
+ Title: data.GetServerName(),
+ ShortDescription: data.GetServerSummary(),
+ Description: data.GetServerSummary(),
+ Version: config.GetReleaseString(),
+ Stats: Stats{
+ UserCount: 1,
+ StatusCount: int(localPostCount),
+ DomainCount: 0,
+ },
+ Thumbnail: thumbnail.String(),
+ Registrations: false,
+ ApprovalRequired: false,
+ InvitesEnabled: false,
+ }
+
+ if err := writeResponse(res, w); err != nil {
+ log.Errorln(err)
+ }
+}
+
+func writeResponse(payload interface{}, w http.ResponseWriter) error {
+ accountName := data.GetDefaultFederationUsername()
+ actorIRI := apmodels.MakeLocalIRIForAccount(accountName)
+ publicKey := crypto.GetPublicKey(actorIRI)
+
+ return requests.WritePayloadResponse(payload, w, publicKey)
+}
+
+// HostMetaController points to webfinger.
+func HostMetaController(w http.ResponseWriter, r *http.Request) {
+ serverURL := data.GetServerURL()
+ if serverURL == "" {
+ w.WriteHeader(http.StatusNotFound)
+ return
+ }
+
+ res := fmt.Sprintf(`
+
%s
", title) + } + textContent = fmt.Sprintf("%s
%s
%s
%s", textContent, streamTitle, tagsString, data.GetServerURL(), data.GetServerURL()) + + activity, _, note, noteID := createBaseOutboundMessage(textContent) + + note.SetActivityStreamsTag(tagProp) + + // Attach an image along with the Federated message. + previewURL, err := url.Parse(data.GetServerURL()) + if err == nil { + var imageToAttach string + previewGif := filepath.Join(config.WebRoot, "preview.gif") + thumbnailJpg := filepath.Join(config.WebRoot, "thumbnail.jpg") + + if utils.DoesFileExists(previewGif) { + imageToAttach = "preview.gif" + } else if utils.DoesFileExists(thumbnailJpg) { + imageToAttach = "thumbnail.jpg" + } + if imageToAttach != "" { + previewURL.Path = imageToAttach + apmodels.AddImageAttachmentToNote(note, previewURL.String()) + } + } + + if data.GetNSFW() { + // Mark content as sensitive. + sensitive := streams.NewActivityStreamsSensitiveProperty() + sensitive.AppendXMLSchemaBoolean(true) + note.SetActivityStreamsSensitive(sensitive) + } + + b, err := apmodels.Serialize(activity) + if err != nil { + log.Errorln("unable to serialize go live message activity", err) + return errors.New("unable to serialize go live message activity " + err.Error()) + } + + if err := SendToFollowers(b); err != nil { + return err + } + + if err := Add(note, noteID, true); err != nil { + return err + } + + return nil +} + +// SendPublicMessage will send a public message to all followers. +func SendPublicMessage(textContent string) error { + originalContent := textContent + textContent = utils.RenderSimpleMarkdown(textContent) + + tagProp := streams.NewActivityStreamsTagProperty() + + // Iterate through the post text and find #Hashtags. + words := strings.Split(originalContent, " ") + for _, word := range words { + if strings.HasPrefix(word, "#") { + tagWithoutHashtag := strings.TrimPrefix(word, "#") + + // Replace the instances of the tag with a link to the tag page. + tagHTML := getHashtagLinkHTMLFromTagString(tagWithoutHashtag) + textContent = strings.ReplaceAll(textContent, word, tagHTML) + + // Create Hashtag object for the tag. + hashtag := apmodels.MakeHashtag(tagWithoutHashtag) + tagProp.AppendTootHashtag(hashtag) + } + } + + activity, _, note, noteID := createBaseOutboundMessage(textContent) + note.SetActivityStreamsTag(tagProp) + + b, err := apmodels.Serialize(activity) + if err != nil { + log.Errorln("unable to serialize custom fediverse message activity", err) + return errors.New("unable to serialize custom fediverse message activity " + err.Error()) + } + + if err := SendToFollowers(b); err != nil { + return err + } + + if err := Add(note, noteID, false); err != nil { + return err + } + + return nil +} + +// nolint: unparam +func createBaseOutboundMessage(textContent string) (vocab.ActivityStreamsCreate, string, vocab.ActivityStreamsNote, string) { + localActor := apmodels.MakeLocalIRIForAccount(data.GetDefaultFederationUsername()) + noteID := shortid.MustGenerate() + noteIRI := apmodels.MakeLocalIRIForResource(noteID) + id := shortid.MustGenerate() + activity := apmodels.CreateCreateActivity(id, localActor) + object := streams.NewActivityStreamsObjectProperty() + activity.SetActivityStreamsObject(object) + + note := apmodels.MakeNote(textContent, noteIRI, localActor) + object.AppendActivityStreamsNote(note) + + return activity, id, note, noteID +} + +// Get Hashtag HTML link for a given tag (without # prefix). +func getHashtagLinkHTMLFromTagString(baseHashtag string) string { + return fmt.Sprintf("#%s", baseHashtag, baseHashtag) +} + +// SendToFollowers will send an arbitrary payload to all follower inboxes. +func SendToFollowers(payload []byte) error { + localActor := apmodels.MakeLocalIRIForAccount(data.GetDefaultFederationUsername()) + + followers, err := persistence.GetFederationFollowers(-1, 0) + if err != nil { + log.Errorln("unable to fetch followers to send to", err) + return errors.New("unable to fetch followers to send payload to") + } + + for _, follower := range followers { + inbox, _ := url.Parse(follower.Inbox) + req, err := requests.CreateSignedRequest(payload, inbox, localActor) + if err != nil { + log.Errorln("unable to create outbox request", follower.Inbox, err) + return errors.New("unable to create outbox request: " + follower.Inbox) + } + + workerpool.AddToOutboundQueue(req) + } + return nil +} + +// UpdateFollowersWithAccountUpdates will send an update to all followers alerting of a profile update. +func UpdateFollowersWithAccountUpdates() error { + // Don't do anything if federation is disabled. + if !data.GetFederationEnabled() { + return nil + } + + id := shortid.MustGenerate() + objectID := apmodels.MakeLocalIRIForResource(id) + activity := apmodels.MakeUpdateActivity(objectID) + + actor := streams.NewActivityStreamsPerson() + actorID := apmodels.MakeLocalIRIForAccount(data.GetDefaultFederationUsername()) + actorIDProperty := streams.NewJSONLDIdProperty() + actorIDProperty.Set(actorID) + actor.SetJSONLDId(actorIDProperty) + + actorProperty := streams.NewActivityStreamsActorProperty() + actorProperty.AppendActivityStreamsPerson(actor) + activity.SetActivityStreamsActor(actorProperty) + + obj := streams.NewActivityStreamsObjectProperty() + obj.AppendIRI(actorID) + activity.SetActivityStreamsObject(obj) + + b, err := apmodels.Serialize(activity) + if err != nil { + log.Errorln("unable to serialize send update actor activity", err) + return errors.New("unable to serialize send update actor activity") + } + return SendToFollowers(b) +} + +// Add will save an ActivityPub object to the datastore. +func Add(item vocab.Type, id string, isLiveNotification bool) error { + iri := item.GetJSONLDId().GetIRI().String() + typeString := item.GetTypeName() + + if iri == "" { + log.Errorln("Unable to get iri from item") + return errors.New("Unable to get iri from item " + id) + } + + b, err := apmodels.Serialize(item) + if err != nil { + log.Errorln("unable to serialize model when saving to outbox", err) + return err + } + + return persistence.AddToOutbox(iri, b, typeString, isLiveNotification) +} diff --git a/activitypub/persistence/followers.go b/activitypub/persistence/followers.go new file mode 100644 index 000000000..77863038e --- /dev/null +++ b/activitypub/persistence/followers.go @@ -0,0 +1,121 @@ +package persistence + +import ( + "context" + + "github.com/owncast/owncast/db" + "github.com/owncast/owncast/models" + "github.com/owncast/owncast/utils" + log "github.com/sirupsen/logrus" +) + +func createFederationFollowersTable() { + log.Traceln("Creating federation followers table...") + + createTableSQL := `CREATE TABLE IF NOT EXISTS ap_followers ( + "iri" TEXT NOT NULL, + "inbox" TEXT NOT NULL, + "name" TEXT, + "username" TEXT NOT NULL, + "image" TEXT, + "request" TEXT NOT NULL, + "created_at" TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + "approved_at" TIMESTAMP, + "disabled_at" TIMESTAMP, + PRIMARY KEY (iri)); + CREATE INDEX iri_index ON ap_followers (iri); + CREATE INDEX approved_at_index ON ap_followers (approved_at);` + + stmt, err := _datastore.DB.Prepare(createTableSQL) + if err != nil { + log.Fatal(err) + } + defer stmt.Close() + _, err = stmt.Exec() + if err != nil { + log.Warnln("error executing sql creating followers table", createTableSQL, err) + } +} + +// GetFollowerCount will return the number of followers we're keeping track of. +func GetFollowerCount() (int64, error) { + ctx := context.Background() + return _datastore.GetQueries().GetFollowerCount(ctx) +} + +// GetFederationFollowers will return a slice of the followers we keep track of locally. +func GetFederationFollowers(limit int, offset int) ([]models.Follower, error) { + ctx := context.Background() + followersResult, err := _datastore.GetQueries().GetFederationFollowersWithOffset(ctx, db.GetFederationFollowersWithOffsetParams{ + Limit: int32(limit), + Offset: int32(offset), + }) + if err != nil { + return nil, err + } + + followers := make([]models.Follower, 0) + + for _, row := range followersResult { + singleFollower := models.Follower{ + Name: row.Name.String, + Username: row.Username, + Image: row.Image.String, + ActorIRI: row.Iri, + Inbox: row.Inbox, + Timestamp: utils.NullTime(row.CreatedAt), + } + + followers = append(followers, singleFollower) + } + + return followers, nil +} + +// GetPendingFollowRequests will return pending follow requests. +func GetPendingFollowRequests() ([]models.Follower, error) { + pendingFollowersResult, err := _datastore.GetQueries().GetFederationFollowerApprovalRequests(context.Background()) + if err != nil { + return nil, err + } + + followers := make([]models.Follower, 0) + + for _, row := range pendingFollowersResult { + singleFollower := models.Follower{ + Name: row.Name.String, + Username: row.Username, + Image: row.Image.String, + ActorIRI: row.Iri, + Inbox: row.Inbox, + Timestamp: utils.NullTime{Time: row.CreatedAt.Time, Valid: true}, + } + followers = append(followers, singleFollower) + } + + return followers, nil +} + +// GetBlockedAndRejectedFollowers will return blocked and rejected followers. +func GetBlockedAndRejectedFollowers() ([]models.Follower, error) { + pendingFollowersResult, err := _datastore.GetQueries().GetRejectedAndBlockedFollowers(context.Background()) + if err != nil { + return nil, err + } + + followers := make([]models.Follower, 0) + + for _, row := range pendingFollowersResult { + singleFollower := models.Follower{ + Name: row.Name.String, + Username: row.Username, + Image: row.Image.String, + ActorIRI: row.Iri, + DisabledAt: utils.NullTime{Time: row.DisabledAt.Time, Valid: true}, + Timestamp: utils.NullTime{Time: row.CreatedAt.Time, Valid: true}, + } + followers = append(followers, singleFollower) + } + + return followers, nil +} diff --git a/activitypub/persistence/persistence.go b/activitypub/persistence/persistence.go new file mode 100644 index 000000000..842cce712 --- /dev/null +++ b/activitypub/persistence/persistence.go @@ -0,0 +1,360 @@ +package persistence + +import ( + "context" + "database/sql" + "fmt" + "net/url" + "time" + + "github.com/go-fed/activity/streams" + "github.com/go-fed/activity/streams/vocab" + "github.com/owncast/owncast/activitypub/apmodels" + "github.com/owncast/owncast/activitypub/resolvers" + "github.com/owncast/owncast/core/data" + "github.com/owncast/owncast/db" + "github.com/owncast/owncast/models" + "github.com/pkg/errors" + + log "github.com/sirupsen/logrus" +) + +var _datastore *data.Datastore + +// Setup will initialize the ActivityPub persistence layer with the provided datastore. +func Setup(datastore *data.Datastore) { + _datastore = datastore + createFederationFollowersTable() + createFederationOutboxTable() + createFederatedActivitiesTable() +} + +// AddFollow will save a follow to the datastore. +func AddFollow(follow apmodels.ActivityPubActor, approved bool) error { + log.Traceln("Saving", follow.ActorIri, "as a follower.") + var image string + if follow.Image != nil { + image = follow.Image.String() + } + return createFollow(follow.ActorIri.String(), follow.Inbox.String(), follow.FollowRequestIri.String(), follow.Name, follow.Username, image, approved) +} + +// RemoveFollow will remove a follow from the datastore. +func RemoveFollow(unfollow apmodels.ActivityPubActor) error { + log.Traceln("Removing", unfollow.ActorIri, "as a follower.") + return removeFollow(unfollow.ActorIri) +} + +// GetFollower will return a single follower/request given an IRI. +func GetFollower(iri string) (*apmodels.ActivityPubActor, error) { + result, err := _datastore.GetQueries().GetFollowerByIRI(context.Background(), iri) + if err != nil { + return nil, err + } + + followIRI, err := url.Parse(result.Request) + if err != nil { + return nil, errors.Wrap(err, "error parsing follow request IRI") + } + + iriURL, err := url.Parse(result.Iri) + if err != nil { + return nil, errors.Wrap(err, "error parsing actor IRI") + } + + inbox, err := url.Parse(result.Inbox) + if err != nil { + return nil, errors.Wrap(err, "error parsing acting inbox") + } + + image, _ := url.Parse(result.Image.String) + + var disabledAt *time.Time + if result.DisabledAt.Valid { + disabledAt = &result.DisabledAt.Time + } + + follower := apmodels.ActivityPubActor{ + ActorIri: iriURL, + Inbox: inbox, + Name: result.Name.String, + Username: result.Username, + Image: image, + FollowRequestIri: followIRI, + DisabledAt: disabledAt, + } + + return &follower, nil +} + +// ApprovePreviousFollowRequest will approve a follow request. +func ApprovePreviousFollowRequest(iri string) error { + return _datastore.GetQueries().ApproveFederationFollower(context.Background(), db.ApproveFederationFollowerParams{ + Iri: iri, + ApprovedAt: sql.NullTime{ + Time: time.Now(), + Valid: true, + }, + }) +} + +// BlockOrRejectFollower will block an existing follower or reject a follow request. +func BlockOrRejectFollower(iri string) error { + return _datastore.GetQueries().RejectFederationFollower(context.Background(), db.RejectFederationFollowerParams{ + Iri: iri, + DisabledAt: sql.NullTime{ + Time: time.Now(), + Valid: true, + }, + }) +} + +func createFollow(actor string, inbox string, request string, name string, username string, image string, approved bool) error { + tx, err := _datastore.DB.Begin() + if err != nil { + log.Debugln(err) + } + defer func() { + _ = tx.Rollback() + }() + + var approvedAt sql.NullTime + if approved { + approvedAt = sql.NullTime{ + Time: time.Now(), + Valid: true, + } + } + + if err = _datastore.GetQueries().WithTx(tx).AddFollower(context.Background(), db.AddFollowerParams{ + Iri: actor, + Inbox: inbox, + Name: sql.NullString{String: name, Valid: true}, + Username: username, + Image: sql.NullString{String: image, Valid: true}, + ApprovedAt: approvedAt, + Request: request, + }); err != nil { + log.Errorln("error creating new federation follow: ", err) + } + + return tx.Commit() +} + +// UpdateFollower will update the details of a stored follower given an IRI. +func UpdateFollower(actorIRI string, inbox string, name string, username string, image string) error { + _datastore.DbLock.Lock() + defer _datastore.DbLock.Unlock() + + tx, err := _datastore.DB.Begin() + if err != nil { + log.Debugln(err) + } + defer func() { + _ = tx.Rollback() + }() + + if err = _datastore.GetQueries().WithTx(tx).UpdateFollowerByIRI(context.Background(), db.UpdateFollowerByIRIParams{ + Inbox: inbox, + Name: sql.NullString{String: name, Valid: true}, + Username: username, + Image: sql.NullString{String: image, Valid: true}, + Iri: actorIRI, + }); err != nil { + return fmt.Errorf("error updating follower %s %s", actorIRI, err) + } + + return tx.Commit() +} + +func removeFollow(actor *url.URL) error { + _datastore.DbLock.Lock() + defer _datastore.DbLock.Unlock() + + tx, err := _datastore.DB.Begin() + if err != nil { + return err + } + defer func() { + _ = tx.Rollback() + }() + + if err := _datastore.GetQueries().WithTx(tx).RemoveFollowerByIRI(context.Background(), actor.String()); err != nil { + return err + } + + return tx.Commit() +} + +// createFederatedActivitiesTable will create the accepted +// activities table if needed. +func createFederatedActivitiesTable() { + createTableSQL := `CREATE TABLE IF NOT EXISTS ap_accepted_activities ( + "id" INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + "iri" TEXT NOT NULL, + "actor" TEXT NOT NULL, + "type" TEXT NOT NULL, + "timestamp" TIMESTAMP NOT NULL + ); + CREATE INDEX iri_actor_index ON ap_accepted_activities (iri,actor);` + + stmt, err := _datastore.DB.Prepare(createTableSQL) + if err != nil { + log.Fatal("error creating inbox table", err) + } + defer stmt.Close() + if _, err := stmt.Exec(); err != nil { + log.Fatal("error creating inbound federated activities table", err) + } +} + +func createFederationOutboxTable() { + log.Traceln("Creating federation outbox table...") + createTableSQL := `CREATE TABLE IF NOT EXISTS ap_outbox ( + "iri" TEXT NOT NULL, + "value" BLOB, + "type" TEXT NOT NULL, + "created_at" TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + "live_notification" BOOLEAN DEFAULT FALSE, + PRIMARY KEY (iri)); + CREATE INDEX iri ON ap_outbox (iri); + CREATE INDEX type ON ap_outbox (type); + CREATE INDEX live_notification ON ap_outbox (live_notification);` + + stmt, err := _datastore.DB.Prepare(createTableSQL) + if err != nil { + log.Fatal(err) + } + defer stmt.Close() + _, err = stmt.Exec() + if err != nil { + log.Warnln("error executing sql creating outbox table", createTableSQL, err) + } +} + +// GetOutboxPostCount will return the number of posts in the outbox. +func GetOutboxPostCount() (int64, error) { + ctx := context.Background() + return _datastore.GetQueries().GetLocalPostCount(ctx) +} + +// GetOutbox will return an instance of the outbox populated by stored items. +func GetOutbox(limit int, offset int) (vocab.ActivityStreamsOrderedCollection, error) { + collection := streams.NewActivityStreamsOrderedCollection() + orderedItems := streams.NewActivityStreamsOrderedItemsProperty() + rows, err := _datastore.GetQueries().GetOutboxWithOffset( + context.Background(), + db.GetOutboxWithOffsetParams{Limit: int32(limit), Offset: int32(offset)}, + ) + if err != nil { + return collection, err + } + + for _, value := range rows { + createCallback := func(c context.Context, activity vocab.ActivityStreamsCreate) error { + orderedItems.AppendActivityStreamsCreate(activity) + return nil + } + if err := resolvers.Resolve(context.Background(), value, createCallback); err != nil { + return collection, err + } + } + + return collection, nil +} + +// AddToOutbox will store a single payload to the persistence layer. +func AddToOutbox(iri string, itemData []byte, typeString string, isLiveNotification bool) error { + tx, err := _datastore.DB.Begin() + if err != nil { + log.Debugln(err) + } + defer func() { + _ = tx.Rollback() + }() + + if err = _datastore.GetQueries().WithTx(tx).AddToOutbox(context.Background(), db.AddToOutboxParams{ + Iri: iri, + Value: itemData, + Type: typeString, + LiveNotification: sql.NullBool{Bool: isLiveNotification, Valid: true}, + }); err != nil { + return fmt.Errorf("error creating new item in federation outbox %s", err) + } + + return tx.Commit() +} + +// GetObjectByID will return a string representation of a single object by the ID. +func GetObjectByID(id string) (string, error) { + value, err := _datastore.GetQueries().GetObjectFromOutboxByID(context.Background(), id) + return string(value), err +} + +// GetObjectByIRI will return a string representation of a single object by the IRI. +func GetObjectByIRI(iri string) (string, bool, time.Time, error) { + row, err := _datastore.GetQueries().GetObjectFromOutboxByIRI(context.Background(), iri) + return string(row.Value), row.LiveNotification.Bool, row.CreatedAt.Time, err +} + +// GetLocalPostCount will return the number of posts existing locally. +func GetLocalPostCount() (int64, error) { + ctx := context.Background() + return _datastore.GetQueries().GetLocalPostCount(ctx) +} + +// SaveInboundFediverseActivity will save an event to the ap_inbound_activities table. +func SaveInboundFediverseActivity(objectIRI string, actorIRI string, eventType string, timestamp time.Time) error { + if err := _datastore.GetQueries().AddToAcceptedActivities(context.Background(), db.AddToAcceptedActivitiesParams{ + Iri: objectIRI, + Actor: actorIRI, + Type: eventType, + Timestamp: timestamp, + }); err != nil { + return errors.Wrap(err, "error saving event "+objectIRI) + } + + return nil +} + +// GetInboundActivities will return a collection of saved, federated activities +// limited and offset by the values provided to support pagination. +func GetInboundActivities(limit int, offset int) ([]models.FederatedActivity, error) { + ctx := context.Background() + rows, err := _datastore.GetQueries().GetInboundActivitiesWithOffset(ctx, db.GetInboundActivitiesWithOffsetParams{ + Limit: int32(limit), + Offset: int32(offset), + }) + if err != nil { + return nil, err + } + + activities := make([]models.FederatedActivity, 0) + + for _, row := range rows { + singleActivity := models.FederatedActivity{ + IRI: row.Iri, + ActorIRI: row.Actor, + Type: row.Type, + Timestamp: row.Timestamp, + } + activities = append(activities, singleActivity) + } + + return activities, nil +} + +// HasPreviouslyHandledInboundActivity will return if we have previously handled +// an inbound federated activity. +func HasPreviouslyHandledInboundActivity(iri string, actorIRI string, eventType string) (bool, error) { + exists, err := _datastore.GetQueries().DoesInboundActivityExist(context.Background(), db.DoesInboundActivityExistParams{ + Iri: iri, + Actor: actorIRI, + Type: eventType, + }) + if err != nil { + return false, err + } + + return exists > 0, nil +} diff --git a/activitypub/requests/acceptFollow.go b/activitypub/requests/acceptFollow.go new file mode 100644 index 000000000..48b5b522c --- /dev/null +++ b/activitypub/requests/acceptFollow.go @@ -0,0 +1,51 @@ +package requests + +import ( + "encoding/json" + "net/url" + + "github.com/go-fed/activity/streams" + "github.com/go-fed/activity/streams/vocab" + "github.com/owncast/owncast/activitypub/apmodels" + "github.com/owncast/owncast/activitypub/workerpool" + + "github.com/teris-io/shortid" +) + +// SendFollowAccept will send an accept activity to a follow request from a specified local user. +func SendFollowAccept(inbox *url.URL, followRequestIRI *url.URL, fromLocalAccountName string) error { + followAccept := makeAcceptFollow(followRequestIRI, fromLocalAccountName) + localAccountIRI := apmodels.MakeLocalIRIForAccount(fromLocalAccountName) + + var jsonmap map[string]interface{} + jsonmap, _ = streams.Serialize(followAccept) + b, _ := json.Marshal(jsonmap) + req, err := CreateSignedRequest(b, inbox, localAccountIRI) + if err != nil { + return err + } + + workerpool.AddToOutboundQueue(req) + + return nil +} + +func makeAcceptFollow(followRequestIri *url.URL, fromAccountName string) vocab.ActivityStreamsAccept { + acceptIDString := shortid.MustGenerate() + acceptID := apmodels.MakeLocalIRIForResource(acceptIDString) + actorID := apmodels.MakeLocalIRIForAccount(fromAccountName) + + accept := streams.NewActivityStreamsAccept() + idProperty := streams.NewJSONLDIdProperty() + idProperty.SetIRI(acceptID) + accept.SetJSONLDId(idProperty) + + actor := apmodels.MakeActorPropertyWithID(actorID) + accept.SetActivityStreamsActor(actor) + + object := streams.NewActivityStreamsObjectProperty() + object.AppendIRI(followRequestIri) + accept.SetActivityStreamsObject(object) + + return accept +} diff --git a/activitypub/requests/http.go b/activitypub/requests/http.go new file mode 100644 index 000000000..7166fc5b0 --- /dev/null +++ b/activitypub/requests/http.go @@ -0,0 +1,75 @@ +package requests + +import ( + "bytes" + "encoding/json" + "fmt" + "net/http" + "net/url" + + "github.com/go-fed/activity/streams" + "github.com/go-fed/activity/streams/vocab" + "github.com/owncast/owncast/activitypub/crypto" + + "github.com/owncast/owncast/config" + + log "github.com/sirupsen/logrus" +) + +// WriteStreamResponse will write a ActivityPub object to the provided ResponseWriter and sign with the provided key. +func WriteStreamResponse(item vocab.Type, w http.ResponseWriter, publicKey crypto.PublicKey) error { + var jsonmap map[string]interface{} + jsonmap, _ = streams.Serialize(item) + b, err := json.Marshal(jsonmap) + if err != nil { + return err + } + + return WriteResponse(b, w, publicKey) +} + +// WritePayloadResponse will write any arbitrary object to the provided ResponseWriter and sign with the provided key. +func WritePayloadResponse(payload interface{}, w http.ResponseWriter, publicKey crypto.PublicKey) error { + b, err := json.Marshal(payload) + if err != nil { + return err + } + + return WriteResponse(b, w, publicKey) +} + +// WriteResponse will write any arbitrary payload to the provided ResponseWriter and sign with the provided key. +func WriteResponse(payload []byte, w http.ResponseWriter, publicKey crypto.PublicKey) error { + w.Header().Set("Content-Type", "application/activity+json") + + if err := crypto.SignResponse(w, payload, publicKey); err != nil { + w.WriteHeader(http.StatusInternalServerError) + log.Errorln("unable to sign response", err) + return err + } + + if _, err := w.Write(payload); err != nil { + w.WriteHeader(http.StatusInternalServerError) + return err + } + + return nil +} + +// CreateSignedRequest will create a signed POST request of a payload to the provided destination. +func CreateSignedRequest(payload []byte, url *url.URL, fromActorIRI *url.URL) (*http.Request, error) { + log.Debugln("Sending", string(payload), "to", url) + + req, _ := http.NewRequest("POST", url.String(), bytes.NewBuffer(payload)) + + ua := fmt.Sprintf("%s; https://owncast.online", config.GetReleaseString()) + req.Header.Set("User-Agent", ua) + req.Header.Set("Content-Type", "application/activity+json") + + if err := crypto.SignRequest(req, payload, fromActorIRI); err != nil { + log.Errorln("error signing request:", err) + return nil, err + } + + return req, nil +} diff --git a/activitypub/resolvers/follow.go b/activitypub/resolvers/follow.go new file mode 100644 index 000000000..8a2c2aa97 --- /dev/null +++ b/activitypub/resolvers/follow.go @@ -0,0 +1,57 @@ +package resolvers + +import ( + "context" + "errors" + "fmt" + + "github.com/go-fed/activity/streams/vocab" + "github.com/owncast/owncast/activitypub/apmodels" + log "github.com/sirupsen/logrus" +) + +func getPersonFromFollow(activity vocab.ActivityStreamsFollow) (apmodels.ActivityPubActor, error) { + return GetResolvedActorFromActorProperty(activity.GetActivityStreamsActor()) +} + +// MakeFollowRequest will convert an inbound Follow request to our internal actor model. +func MakeFollowRequest(c context.Context, activity vocab.ActivityStreamsFollow) (*apmodels.ActivityPubActor, error) { + person, err := getPersonFromFollow(activity) + if err != nil { + return nil, errors.New("unable to resolve person from follow request: " + err.Error()) + } + + hostname := person.ActorIri.Hostname() + username := person.Username + fullUsername := fmt.Sprintf("%s@%s", username, hostname) + + followRequest := apmodels.ActivityPubActor{ + ActorIri: person.ActorIri, + FollowRequestIri: activity.GetJSONLDId().Get(), + Inbox: person.Inbox, + Name: person.Name, + Username: fullUsername, + Image: person.Image, + } + + return &followRequest, nil +} + +// MakeUnFollowRequest will convert an inbound Unfollow request to our internal actor model. +func MakeUnFollowRequest(c context.Context, activity vocab.ActivityStreamsUndo) *apmodels.ActivityPubActor { + person, err := GetResolvedActorFromActorProperty(activity.GetActivityStreamsActor()) + if err != nil { + log.Errorln("unable to resolve person from actor iri", person.ActorIri, err) + return nil + } + + unfollowRequest := apmodels.ActivityPubActor{ + ActorIri: person.ActorIri, + FollowRequestIri: activity.GetJSONLDId().Get(), + Inbox: person.Inbox, + Name: person.Name, + Image: person.Image, + } + + return &unfollowRequest +} diff --git a/activitypub/resolvers/resolve.go b/activitypub/resolvers/resolve.go new file mode 100644 index 000000000..dc0d00396 --- /dev/null +++ b/activitypub/resolvers/resolve.go @@ -0,0 +1,125 @@ +package resolvers + +import ( + "context" + "encoding/json" + "io/ioutil" + "net/http" + + "github.com/go-fed/activity/streams" + "github.com/go-fed/activity/streams/vocab" + "github.com/owncast/owncast/activitypub/apmodels" + "github.com/owncast/owncast/activitypub/crypto" + "github.com/owncast/owncast/core/data" + "github.com/pkg/errors" + log "github.com/sirupsen/logrus" +) + +// Resolve will translate a raw ActivityPub payload and fire the callback associated with that activity type. +func Resolve(c context.Context, data []byte, callbacks ...interface{}) error { + jsonResolver, err := streams.NewJSONResolver(callbacks...) + if err != nil { + // Something in the setup was wrong. For example, a callback has an + // unsupported signature and would never be called + return err + } + + var jsonMap map[string]interface{} + if err = json.Unmarshal(data, &jsonMap); err != nil { + return err + } + + log.Debugln("Resolving payload...", string(data)) + + // The createCallback function will be called. + err = jsonResolver.Resolve(c, jsonMap) + if err != nil && !streams.IsUnmatchedErr(err) { + // Something went wrong + return err + } else if streams.IsUnmatchedErr(err) { + // Everything went right but the callback didn't match or the ActivityStreams + // type is one that wasn't code generated. + log.Debugln("No match: ", err) + } + + return nil +} + +// ResolveIRI will resolve an IRI ahd call the correct callback for the resolved type. +func ResolveIRI(c context.Context, iri string, callbacks ...interface{}) error { + log.Debugln("Resolving", iri) + + req, _ := http.NewRequest("GET", iri, nil) + + actor := apmodels.MakeLocalIRIForAccount(data.GetDefaultFederationUsername()) + if err := crypto.SignRequest(req, nil, actor); err != nil { + return err + } + + response, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + + defer response.Body.Close() + + data, err := ioutil.ReadAll(response.Body) + if err != nil { + return err + } + + // fmt.Println(string(data)) + return Resolve(c, data, callbacks...) +} + +// GetResolvedActorFromActorProperty resolve an actor property to a fully populated person. +func GetResolvedActorFromActorProperty(actor vocab.ActivityStreamsActorProperty) (apmodels.ActivityPubActor, error) { + var err error + var apActor apmodels.ActivityPubActor + + personCallback := func(c context.Context, person vocab.ActivityStreamsPerson) error { + apActor = apmodels.MakeActorFromPerson(person) + return nil + } + + serviceCallback := func(c context.Context, s vocab.ActivityStreamsService) error { + apActor = apmodels.MakeActorFromService(s) + return nil + } + + for iter := actor.Begin(); iter != actor.End(); iter = iter.Next() { + if iter.IsIRI() { + iri := iter.GetIRI() + if e := ResolveIRI(context.Background(), iri.String(), personCallback, serviceCallback); e != nil { + err = e + } + } else if iter.IsActivityStreamsPerson() { + person := iter.GetActivityStreamsPerson() + apActor = apmodels.MakeActorFromPerson(person) + } + } + + return apActor, errors.Wrap(err, "unable to resolve actor from actor property") +} + +// GetResolvedActorFromIRI will resolve an IRI string to a fully populated actor. +func GetResolvedActorFromIRI(personOrServiceIRI string) (apmodels.ActivityPubActor, error) { + var err error + var apActor apmodels.ActivityPubActor + + personCallback := func(c context.Context, person vocab.ActivityStreamsPerson) error { + apActor = apmodels.MakeActorFromPerson(person) + return nil + } + + serviceCallback := func(c context.Context, s vocab.ActivityStreamsService) error { + apActor = apmodels.MakeActorFromService(s) + return nil + } + + if e := ResolveIRI(context.Background(), personOrServiceIRI, personCallback, serviceCallback); e != nil { + err = e + } + + return apActor, errors.Wrap(err, "unable to resolve actor from IRI string: "+personOrServiceIRI) +} diff --git a/activitypub/router.go b/activitypub/router.go new file mode 100644 index 000000000..a7dc81d75 --- /dev/null +++ b/activitypub/router.go @@ -0,0 +1,35 @@ +package activitypub + +import ( + "net/http" + + "github.com/owncast/owncast/activitypub/controllers" + "github.com/owncast/owncast/router/middleware" +) + +// StartRouter will start the federation specific http router. +func StartRouter() { + // WebFinger + http.HandleFunc("/.well-known/webfinger", controllers.WebfingerHandler) + + // Host Metadata + http.HandleFunc("/.well-known/host-meta", controllers.HostMetaController) + + // Nodeinfo v1 + http.HandleFunc("/.well-known/nodeinfo", controllers.NodeInfoController) + + // x-nodeinfo v2 + http.HandleFunc("/.well-known/x-nodeinfo2", controllers.XNodeInfo2Controller) + + // Nodeinfo v2 + http.HandleFunc("/nodeinfo/2.0", controllers.NodeInfoV2Controller) + + // Instance details + http.HandleFunc("/api/v1/instance", controllers.InstanceV1Controller) + + // Single ActivityPub Actor + http.HandleFunc("/federation/user/", middleware.RequireActivityPubOrRedirect(controllers.ActorHandler)) + + // Single AP object + http.HandleFunc("/federation/", middleware.RequireActivityPubOrRedirect(controllers.ObjectHandler)) +} diff --git a/activitypub/workerpool/outbound.go b/activitypub/workerpool/outbound.go new file mode 100644 index 000000000..6c211afcb --- /dev/null +++ b/activitypub/workerpool/outbound.go @@ -0,0 +1,66 @@ +package workerpool + +import ( + "net/http" + + log "github.com/sirupsen/logrus" +) + +const ( + // ActivityPubWorkerPoolSize defines the number of concurrent HTTP ActivityPub requests. + ActivityPubWorkerPoolSize = 10 +) + +// Job struct bundling the ActivityPub and the payload in one struct. +type Job struct { + request *http.Request +} + +var queue chan Job + +// InitOutboundWorkerPool starts n go routines that await ActivityPub jobs. +func InitOutboundWorkerPool() { + queue = make(chan Job) + + // start workers + for i := 1; i <= ActivityPubWorkerPoolSize; i++ { + go worker(i, queue) + } +} + +// AddToOutboundQueue will queue up an outbound http request. +func AddToOutboundQueue(req *http.Request) { + log.Tracef("Queued request for ActivityPub destination %s", req.RequestURI) + queue <- Job{req} +} + +func worker(workerID int, queue <-chan Job) { + log.Debugf("Started ActivityPub worker %d", workerID) + + for job := range queue { + if err := sendActivityPubMessageToInbox(job); err != nil { + log.Errorf("ActivityPub destination %s failed to send Error: %s", job.request.RequestURI, err) + } + log.Tracef("Done with ActivityPub destination %s using worker %d", job.request.RequestURI, workerID) + } +} + +func sendActivityPubMessageToInbox(job Job) error { + // req, err := http.NewRequest("POST", job.inbox.String(), bytes.NewReader(job.payload)) + // if err != nil { + // return err + // } + + // req.Header.Set("Content-Type", "application/json") + + client := &http.Client{} + + resp, err := client.Do(job.request) + if err != nil { + return err + } + + defer resp.Body.Close() + + return nil +} diff --git a/config/defaults.go b/config/defaults.go index de71849fd..6b5bd1d90 100644 --- a/config/defaults.go +++ b/config/defaults.go @@ -24,6 +24,9 @@ type Defaults struct { SegmentLengthSeconds int SegmentsInPlaylist int StreamVariants []models.StreamOutputVariant + + FederationUsername string + FederationGoLiveMessage string } // GetDefaults will return default configuration values. @@ -59,5 +62,8 @@ func GetDefaults() Defaults { CPUUsageLevel: 2, }, }, + + FederationUsername: "streamer", + FederationGoLiveMessage: "I've gone live!", } } diff --git a/controllers/admin/config.go b/controllers/admin/config.go index e2fd85a2a..529ace488 100644 --- a/controllers/admin/config.go +++ b/controllers/admin/config.go @@ -11,6 +11,7 @@ import ( "reflect" "strings" + "github.com/owncast/owncast/activitypub/outbox" "github.com/owncast/owncast/controllers" "github.com/owncast/owncast/core/chat" "github.com/owncast/owncast/core/data" @@ -46,6 +47,12 @@ func SetTags(w http.ResponseWriter, r *http.Request) { return } + // Update Fediverse followers about this change. + if err := outbox.UpdateFollowersWithAccountUpdates(); err != nil { + controllers.WriteSimpleResponse(w, false, err.Error()) + return + } + controllers.WriteSimpleResponse(w, true, "changed") } @@ -99,6 +106,12 @@ func SetServerName(w http.ResponseWriter, r *http.Request) { return } + // Update Fediverse followers about this change. + if err := outbox.UpdateFollowersWithAccountUpdates(); err != nil { + controllers.WriteSimpleResponse(w, false, err.Error()) + return + } + controllers.WriteSimpleResponse(w, true, "changed") } @@ -118,6 +131,12 @@ func SetServerSummary(w http.ResponseWriter, r *http.Request) { return } + // Update Fediverse followers about this change. + if err := outbox.UpdateFollowersWithAccountUpdates(); err != nil { + controllers.WriteSimpleResponse(w, false, err.Error()) + return + } + controllers.WriteSimpleResponse(w, true, "changed") } @@ -233,6 +252,12 @@ func SetLogo(w http.ResponseWriter, r *http.Request) { return } + // Update Fediverse followers about this change. + if err := outbox.UpdateFollowersWithAccountUpdates(); err != nil { + controllers.WriteSimpleResponse(w, false, err.Error()) + return + } + controllers.WriteSimpleResponse(w, true, "changed") } @@ -504,6 +529,12 @@ func SetSocialHandles(w http.ResponseWriter, r *http.Request) { return } + // Update Fediverse followers about this change. + if err := outbox.UpdateFollowersWithAccountUpdates(); err != nil { + controllers.WriteSimpleResponse(w, false, err.Error()) + return + } + controllers.WriteSimpleResponse(w, true, "social handles updated") } diff --git a/controllers/admin/federation.go b/controllers/admin/federation.go new file mode 100644 index 000000000..43f2aa695 --- /dev/null +++ b/controllers/admin/federation.go @@ -0,0 +1,171 @@ +package admin + +import ( + "net/http" + + "github.com/owncast/owncast/activitypub" + "github.com/owncast/owncast/activitypub/outbox" + "github.com/owncast/owncast/activitypub/persistence" + "github.com/owncast/owncast/controllers" + "github.com/owncast/owncast/core/data" +) + +// SendFederatedMessage will send a manual message to the fediverse. +func SendFederatedMessage(w http.ResponseWriter, r *http.Request) { + if !requirePOST(w, r) { + return + } + + configValue, success := getValueFromRequest(w, r) + if !success { + return + } + + message, ok := configValue.Value.(string) + if !ok { + controllers.WriteSimpleResponse(w, false, "unable to send message") + } + + if err := activitypub.SendPublicFederatedMessage(message); err != nil { + controllers.WriteSimpleResponse(w, false, err.Error()) + return + } + + controllers.WriteSimpleResponse(w, true, "sent") +} + +// SetFederationEnabled will set if Federation features are enabled. +func SetFederationEnabled(w http.ResponseWriter, r *http.Request) { + if !requirePOST(w, r) { + return + } + + configValue, success := getValueFromRequest(w, r) + if !success { + return + } + + if err := data.SetFederationEnabled(configValue.Value.(bool)); err != nil { + controllers.WriteSimpleResponse(w, false, err.Error()) + return + } + controllers.WriteSimpleResponse(w, true, "federation features saved") +} + +// SetFederationActivityPrivate will set if Federation features are private to followers. +func SetFederationActivityPrivate(w http.ResponseWriter, r *http.Request) { + if !requirePOST(w, r) { + return + } + + configValue, success := getValueFromRequest(w, r) + if !success { + return + } + + if err := data.SetFederationIsPrivate(configValue.Value.(bool)); err != nil { + controllers.WriteSimpleResponse(w, false, err.Error()) + return + } + + // Update Fediverse followers about this change. + if err := outbox.UpdateFollowersWithAccountUpdates(); err != nil { + controllers.WriteSimpleResponse(w, false, err.Error()) + return + } + + controllers.WriteSimpleResponse(w, true, "federation private saved") +} + +// SetFederationShowEngagement will set if Fedivese engagement shows in chat. +func SetFederationShowEngagement(w http.ResponseWriter, r *http.Request) { + if !requirePOST(w, r) { + return + } + + configValue, success := getValueFromRequest(w, r) + if !success { + return + } + + if err := data.SetFederationShowEngagement(configValue.Value.(bool)); err != nil { + controllers.WriteSimpleResponse(w, false, err.Error()) + return + } + controllers.WriteSimpleResponse(w, true, "federation show engagement saved") +} + +// SetFederationUsername will set the local actor username used for federation activities. +func SetFederationUsername(w http.ResponseWriter, r *http.Request) { + if !requirePOST(w, r) { + return + } + + configValue, success := getValueFromRequest(w, r) + if !success { + return + } + + if err := data.SetFederationUsername(configValue.Value.(string)); err != nil { + controllers.WriteSimpleResponse(w, false, err.Error()) + return + } + + controllers.WriteSimpleResponse(w, true, "username saved") +} + +// SetFederationGoLiveMessage will set the federated message sent when the streamer goes live. +func SetFederationGoLiveMessage(w http.ResponseWriter, r *http.Request) { + if !requirePOST(w, r) { + return + } + + configValue, success := getValueFromRequest(w, r) + if !success { + return + } + + if err := data.SetFederationGoLiveMessage(configValue.Value.(string)); err != nil { + controllers.WriteSimpleResponse(w, false, err.Error()) + return + } + + controllers.WriteSimpleResponse(w, true, "message saved") +} + +// SetFederationBlockDomains saves a list of domains to block on the Fediverse. +func SetFederationBlockDomains(w http.ResponseWriter, r *http.Request) { + if !requirePOST(w, r) { + return + } + + configValues, success := getValuesFromRequest(w, r) + if !success { + controllers.WriteSimpleResponse(w, false, "unable to handle provided domains") + return + } + + domainStrings := make([]string, 0) + for _, domain := range configValues { + domainStrings = append(domainStrings, domain.Value.(string)) + } + + if err := data.SetBlockedFederatedDomains(domainStrings); err != nil { + controllers.WriteSimpleResponse(w, false, err.Error()) + return + } + + controllers.WriteSimpleResponse(w, true, "saved") +} + +// GetFederatedActions will return the saved list of accepted inbound +// federated activities. +func GetFederatedActions(w http.ResponseWriter, r *http.Request) { + activities, err := persistence.GetInboundActivities(100, 0) + if err != nil { + controllers.WriteSimpleResponse(w, false, err.Error()) + return + } + + controllers.WriteResponse(w, activities) +} diff --git a/controllers/admin/followers.go b/controllers/admin/followers.go new file mode 100644 index 000000000..624688c98 --- /dev/null +++ b/controllers/admin/followers.go @@ -0,0 +1,82 @@ +package admin + +import ( + "encoding/json" + "net/http" + + "github.com/owncast/owncast/activitypub/persistence" + "github.com/owncast/owncast/activitypub/requests" + "github.com/owncast/owncast/controllers" + "github.com/owncast/owncast/core/data" +) + +// ApproveFollower will approve a federated follow request. +func ApproveFollower(w http.ResponseWriter, r *http.Request) { + if !requirePOST(w, r) { + return + } + + type approveFollowerRequest struct { + ActorIRI string `json:"actorIRI"` + Approved bool `json:"approved"` + } + + decoder := json.NewDecoder(r.Body) + var approval approveFollowerRequest + if err := decoder.Decode(&approval); err != nil { + controllers.WriteSimpleResponse(w, false, "unable to handle follower state with provided values") + return + } + + if approval.Approved { + // Approve a follower + if err := persistence.ApprovePreviousFollowRequest(approval.ActorIRI); err != nil { + controllers.WriteSimpleResponse(w, false, err.Error()) + return + } + + localAccountName := data.GetDefaultFederationUsername() + + follower, err := persistence.GetFollower(approval.ActorIRI) + if err != nil { + controllers.WriteSimpleResponse(w, false, err.Error()) + return + } + + // Send the approval to the follow requestor. + if err := requests.SendFollowAccept(follower.Inbox, follower.FollowRequestIri, localAccountName); err != nil { + controllers.WriteSimpleResponse(w, false, err.Error()) + return + } + } else { + // Remove/block a follower + if err := persistence.BlockOrRejectFollower(approval.ActorIRI); err != nil { + controllers.WriteSimpleResponse(w, false, err.Error()) + return + } + } + + controllers.WriteSimpleResponse(w, true, "follower updated") +} + +// GetPendingFollowRequests will return a list of pending follow requests. +func GetPendingFollowRequests(w http.ResponseWriter, r *http.Request) { + requests, err := persistence.GetPendingFollowRequests() + if err != nil { + controllers.WriteSimpleResponse(w, false, err.Error()) + return + } + + controllers.WriteResponse(w, requests) +} + +// GetBlockedAndRejectedFollowers will return blocked and rejected followers. +func GetBlockedAndRejectedFollowers(w http.ResponseWriter, r *http.Request) { + rejections, err := persistence.GetBlockedAndRejectedFollowers() + if err != nil { + controllers.WriteSimpleResponse(w, false, err.Error()) + return + } + + controllers.WriteResponse(w, rejections) +} diff --git a/controllers/admin/serverConfig.go b/controllers/admin/serverConfig.go index 8270e9622..046dec6b6 100644 --- a/controllers/admin/serverConfig.go +++ b/controllers/admin/serverConfig.go @@ -66,6 +66,14 @@ func GetServerConfig(w http.ResponseWriter, r *http.Request) { VideoCodec: data.GetVideoCodec(), ForbiddenUsernames: usernameBlocklist, SuggestedUsernames: usernameSuggestions, + Federation: federationConfigResponse{ + Enabled: data.GetFederationEnabled(), + IsPrivate: data.GetFederationIsPrivate(), + Username: data.GetFederationUsername(), + GoLiveMessage: data.GetFederationGoLiveMessage(), + ShowEngagement: data.GetFederationShowEngagement(), + BlockedDomains: data.GetBlockedFederatedDomains(), + }, } w.Header().Set("Content-Type", "application/json") @@ -77,21 +85,22 @@ func GetServerConfig(w http.ResponseWriter, r *http.Request) { } type serverConfigAdminResponse struct { - InstanceDetails webConfigResponse `json:"instanceDetails"` - FFmpegPath string `json:"ffmpegPath"` - StreamKey string `json:"streamKey"` - WebServerPort int `json:"webServerPort"` - WebServerIP string `json:"webServerIP"` - RTMPServerPort int `json:"rtmpServerPort"` - S3 models.S3 `json:"s3"` - VideoSettings videoSettings `json:"videoSettings"` - YP yp `json:"yp"` - ChatDisabled bool `json:"chatDisabled"` - ExternalActions []models.ExternalAction `json:"externalActions"` - SupportedCodecs []string `json:"supportedCodecs"` - VideoCodec string `json:"videoCodec"` - ForbiddenUsernames []string `json:"forbiddenUsernames"` - SuggestedUsernames []string `json:"suggestedUsernames"` + InstanceDetails webConfigResponse `json:"instanceDetails"` + FFmpegPath string `json:"ffmpegPath"` + StreamKey string `json:"streamKey"` + WebServerPort int `json:"webServerPort"` + WebServerIP string `json:"webServerIP"` + RTMPServerPort int `json:"rtmpServerPort"` + S3 models.S3 `json:"s3"` + VideoSettings videoSettings `json:"videoSettings"` + YP yp `json:"yp"` + ChatDisabled bool `json:"chatDisabled"` + ExternalActions []models.ExternalAction `json:"externalActions"` + SupportedCodecs []string `json:"supportedCodecs"` + VideoCodec string `json:"videoCodec"` + ForbiddenUsernames []string `json:"forbiddenUsernames"` + Federation federationConfigResponse `json:"federation"` + SuggestedUsernames []string `json:"suggestedUsernames"` } type videoSettings struct { @@ -118,3 +127,12 @@ type yp struct { InstanceURL string `json:"instanceUrl"` // The public URL the directory should link to YPServiceURL string `json:"-"` // The base URL to the YP API to register with (optional) } + +type federationConfigResponse struct { + Enabled bool `json:"enabled"` + IsPrivate bool `json:"isPrivate"` + Username string `json:"username"` + GoLiveMessage string `json:"goLiveMessage"` + ShowEngagement bool `json:"showEngagement"` + BlockedDomains []string `json:"blockedDomains"` +} diff --git a/controllers/config.go b/controllers/config.go index 9e5c232df..b91152bdc 100644 --- a/controllers/config.go +++ b/controllers/config.go @@ -2,8 +2,11 @@ package controllers import ( "encoding/json" + "fmt" "net/http" + "net/url" + "github.com/owncast/owncast/activitypub" "github.com/owncast/owncast/config" "github.com/owncast/owncast/core/data" "github.com/owncast/owncast/models" @@ -12,19 +15,26 @@ import ( ) type webConfigResponse struct { - Name string `json:"name"` - Summary string `json:"summary"` - Logo string `json:"logo"` - Tags []string `json:"tags"` - Version string `json:"version"` - NSFW bool `json:"nsfw"` - ExtraPageContent string `json:"extraPageContent"` - StreamTitle string `json:"streamTitle,omitempty"` // What's going on with the current stream - SocialHandles []models.SocialHandle `json:"socialHandles"` - ChatDisabled bool `json:"chatDisabled"` - ExternalActions []models.ExternalAction `json:"externalActions"` - CustomStyles string `json:"customStyles"` - MaxSocketPayloadSize int `json:"maxSocketPayloadSize"` + Name string `json:"name"` + Summary string `json:"summary"` + Logo string `json:"logo"` + Tags []string `json:"tags"` + Version string `json:"version"` + NSFW bool `json:"nsfw"` + ExtraPageContent string `json:"extraPageContent"` + StreamTitle string `json:"streamTitle,omitempty"` // What's going on with the current stream + SocialHandles []models.SocialHandle `json:"socialHandles"` + ChatDisabled bool `json:"chatDisabled"` + ExternalActions []models.ExternalAction `json:"externalActions"` + CustomStyles string `json:"customStyles"` + MaxSocketPayloadSize int `json:"maxSocketPayloadSize"` + Federation federationConfigResponse `json:"federation"` +} + +type federationConfigResponse struct { + Enabled bool `json:"enabled"` + Account string `json:"account,omitempty"` + FollowerCount int `json:"followerCount,omitempty"` } // GetWebConfig gets the status of the server. @@ -46,6 +56,21 @@ func GetWebConfig(w http.ResponseWriter, r *http.Request) { serverSummary := data.GetServerSummary() serverSummary = utils.RenderPageContentMarkdown(serverSummary) + var federationResponse federationConfigResponse + federationEnabled := data.GetFederationEnabled() + + followerCount, _ := activitypub.GetFollowerCount() + if federationEnabled { + serverURLString := data.GetServerURL() + serverURL, _ := url.Parse(serverURLString) + account := fmt.Sprintf("%s@%s", data.GetDefaultFederationUsername(), serverURL.Host) + federationResponse = federationConfigResponse{ + Enabled: federationEnabled, + FollowerCount: int(followerCount), + Account: account, + } + } + configuration := webConfigResponse{ Name: data.GetServerName(), Summary: serverSummary, @@ -60,6 +85,7 @@ func GetWebConfig(w http.ResponseWriter, r *http.Request) { ExternalActions: data.GetExternalActions(), CustomStyles: data.GetCustomStyles(), MaxSocketPayloadSize: config.MaxSocketPayloadSize, + Federation: federationResponse, } if err := json.NewEncoder(w).Encode(configuration); err != nil { diff --git a/controllers/followers.go b/controllers/followers.go new file mode 100644 index 000000000..8561d25a4 --- /dev/null +++ b/controllers/followers.go @@ -0,0 +1,18 @@ +package controllers + +import ( + "net/http" + + "github.com/owncast/owncast/activitypub/persistence" +) + +// GetFollowers will handle an API request to fetch the list of followers (non-activitypub response). +func GetFollowers(w http.ResponseWriter, r *http.Request) { + followers, err := persistence.GetFederationFollowers(-1, 0) + if err != nil { + WriteSimpleResponse(w, false, "unable to fetch followers") + return + } + + WriteResponse(w, followers) +} diff --git a/controllers/index.go b/controllers/index.go index 3f37f1083..b720beee2 100644 --- a/controllers/index.go +++ b/controllers/index.go @@ -35,6 +35,15 @@ type MetadataPage struct { // IndexHandler handles the default index route. func IndexHandler(w http.ResponseWriter, r *http.Request) { middleware.EnableCors(w) + + // Treat recordings and schedule as index requests + pathComponents := strings.Split(r.URL.Path, "/") + pathRequest := pathComponents[1] + + if pathRequest == "recordings" || pathRequest == "schedule" { + r.URL.Path = "index.html" + } + isIndexRequest := r.URL.Path == "/" || filepath.Base(r.URL.Path) == "index.html" || filepath.Base(r.URL.Path) == "" // For search engine bots and social scrapers return a special @@ -91,11 +100,11 @@ func handleScraperMetadataPage(w http.ResponseWriter, r *http.Request) { fullURL, err := url.Parse(fmt.Sprintf("%s://%s%s", scheme, r.Host, r.URL.Path)) if err != nil { - log.Panicln(err) + log.Errorln(err) } imageURL, err := url.Parse(fmt.Sprintf("%s://%s%s", scheme, r.Host, "/logo/external")) if err != nil { - log.Panicln(err) + log.Errorln(err) } status := core.GetStatus() @@ -128,6 +137,6 @@ func handleScraperMetadataPage(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "text/html") if err := tmpl.Execute(w, metadata); err != nil { - log.Panicln(err) + log.Errorln(err) } } diff --git a/controllers/remoteFollow.go b/controllers/remoteFollow.go new file mode 100644 index 000000000..9ea585cc6 --- /dev/null +++ b/controllers/remoteFollow.go @@ -0,0 +1,100 @@ +package controllers + +import ( + "encoding/json" + "fmt" + "net/http" + "net/url" + "strings" + + "github.com/owncast/owncast/core/data" +) + +// RemoteFollow handles a request to begin the remote follow redirect flow. +func RemoteFollow(w http.ResponseWriter, r *http.Request) { + type followRequest struct { + Account string `json:"account"` + } + + type followResponse struct { + RedirectURL string `json:"redirectUrl"` + } + + var request followRequest + decoder := json.NewDecoder(r.Body) + if err := decoder.Decode(&request); err != nil { + WriteSimpleResponse(w, false, "unable to parse request") + return + } + + if request.Account == "" { + WriteSimpleResponse(w, false, "Remote Fediverse account is required to follow.") + return + } + + localActorPath, _ := url.Parse(data.GetServerURL()) + localActorPath.Path = fmt.Sprintf("/federation/user/%s", data.GetDefaultFederationUsername()) + var template string + links, err := getWebfingerLinks(request.Account) + if err != nil { + WriteSimpleResponse(w, false, err.Error()) + return + } + + // Acquire the remote follow redirect template. + for _, link := range links { + for k, v := range link { + if k == "rel" && v == "http://ostatus.org/schema/1.0/subscribe" && link["template"] != nil { + template = link["template"].(string) + } + } + } + + if localActorPath.String() == "" || template == "" { + WriteSimpleResponse(w, false, "unable to determine remote follow information for "+request.Account) + return + } + + redirectURL := strings.Replace(template, "{uri}", localActorPath.String(), 1) + response := followResponse{ + RedirectURL: redirectURL, + } + + WriteResponse(w, response) +} + +func getWebfingerLinks(account string) ([]map[string]interface{}, error) { + type webfingerResponse struct { + Links []map[string]interface{} `json:"links"` + } + + account = strings.TrimLeft(account, "@") // remove any leading @ + accountComponents := strings.Split(account, "@") + fediverseServer := accountComponents[1] + + // HTTPS is required. + requestURL, err := url.Parse("https://" + fediverseServer) + if err != nil { + return nil, fmt.Errorf("unable to parse fediverse server host %s", fediverseServer) + } + + requestURL.Path = "/.well-known/webfinger" + query := requestURL.Query() + query.Add("resource", fmt.Sprintf("acct:%s", account)) + requestURL.RawQuery = query.Encode() + + response, err := http.DefaultClient.Get(requestURL.String()) + if err != nil { + return nil, err + } + + defer response.Body.Close() + + var links webfingerResponse + decoder := json.NewDecoder(response.Body) + if err := decoder.Decode(&links); err != nil { + return nil, err + } + + return links.Links, nil +} diff --git a/core/chat/chat.go b/core/chat/chat.go index 11735e99b..975b12c5e 100644 --- a/core/chat/chat.go +++ b/core/chat/chat.go @@ -78,12 +78,39 @@ func SendSystemMessage(text string, ephemeral bool) error { } if !ephemeral { - saveEvent(message.ID, "system", message.Body, message.GetMessageType(), nil, message.Timestamp) + saveEvent(message.ID, nil, message.Body, message.GetMessageType(), nil, message.Timestamp, nil, nil, nil, nil) } return nil } +// SendFediverseAction will send a message indicating some Fediverse engagement took place. +func SendFediverseAction(eventType string, userAccountName string, image *string, body string, link string) error { + message := events.FediverseEngagementEvent{ + Event: events.Event{ + Type: eventType, + }, + MessageEvent: events.MessageEvent{ + Body: body, + }, + UserAccountName: userAccountName, + Image: image, + Link: link, + } + + message.SetDefaults() + message.RenderBody() + + if err := Broadcast(&message); err != nil { + log.Errorln("error sending system message", err) + return err + } + + saveFederatedAction(message) + + return nil +} + // SendSystemAction will send a system action string as an action event to all clients. func SendSystemAction(text string, ephemeral bool) error { message := events.ActionEvent{ @@ -100,7 +127,7 @@ func SendSystemAction(text string, ephemeral bool) error { } if !ephemeral { - saveEvent(message.ID, "action", message.Body, message.GetMessageType(), nil, message.Timestamp) + saveEvent(message.ID, nil, message.Body, message.GetMessageType(), nil, message.Timestamp, nil, nil, nil, nil) } return nil diff --git a/core/chat/events/eventtype.go b/core/chat/events/eventtype.go index 459f9d8fb..a0d15b63a 100644 --- a/core/chat/events/eventtype.go +++ b/core/chat/events/eventtype.go @@ -34,4 +34,10 @@ const ( ErrorMaxConnectionsExceeded EventType = "ERROR_MAX_CONNECTIONS_EXCEEDED" // ErrorUserDisabled is an error returned when the connecting user has been previously banned/disabled. ErrorUserDisabled EventType = "ERROR_USER_DISABLED" + // FediverseEngagementFollow is an event representing a follow action that took place on the fediverse. + FediverseEngagementFollow EventType = "FEDIVERSE_ENGAGEMENT_FOLLOW" + // FediverseEngagementLike is an event representing a like action that took place on the fediverse. + FediverseEngagementLike EventType = "FEDIVERSE_ENGAGEMENT_LIKE" + // FediverseEngagementRepost is an event representing a re-post action that took place on the fediverse. + FediverseEngagementRepost EventType = "FEDIVERSE_ENGAGEMENT_REPOST" ) diff --git a/core/chat/events/fediverseEngagementEvent.go b/core/chat/events/fediverseEngagementEvent.go new file mode 100644 index 000000000..7bc841783 --- /dev/null +++ b/core/chat/events/fediverseEngagementEvent.go @@ -0,0 +1,32 @@ +package events + +import "github.com/owncast/owncast/core/data" + +// FediverseEngagementEvent is a message displayed in chat on representing an action on the Fediverse. +type FediverseEngagementEvent struct { + Event + MessageEvent + Image *string `json:"image"` + Link string `json:"link"` + UserAccountName string `json:"title"` +} + +// GetBroadcastPayload will return the object to send to all chat users. +func (e *FediverseEngagementEvent) GetBroadcastPayload() EventPayload { + return EventPayload{ + "id": e.ID, + "timestamp": e.Timestamp, + "body": e.Body, + "image": e.Image, + "type": e.Event.Type, + "title": e.UserAccountName, + "user": EventPayload{ + "displayName": data.GetServerName(), + }, + } +} + +// GetMessageType will return the event type for this message. +func (e *FediverseEngagementEvent) GetMessageType() EventType { + return e.Event.Type +} diff --git a/core/chat/persistence.go b/core/chat/persistence.go index 716b71e5c..17bb7796e 100644 --- a/core/chat/persistence.go +++ b/core/chat/persistence.go @@ -34,17 +34,19 @@ func setupPersistence() { // SaveUserMessage will save a single chat event to the messages database. func SaveUserMessage(event events.UserMessageEvent) { - saveEvent(event.ID, event.User.ID, event.Body, event.Type, event.HiddenAt, event.Timestamp) + saveEvent(event.ID, &event.User.ID, event.Body, event.Type, event.HiddenAt, event.Timestamp, nil, nil, nil, nil) } -func saveEvent(id string, userID string, body string, eventType string, hidden *time.Time, timestamp time.Time) { +func saveFederatedAction(event events.FediverseEngagementEvent) { + saveEvent(event.ID, nil, event.Body, event.Type, nil, event.Timestamp, event.Image, &event.Link, &event.UserAccountName, nil) +} + +// nolint: unparam +func saveEvent(id string, userID *string, body string, eventType string, hidden *time.Time, timestamp time.Time, image *string, link *string, title *string, subtitle *string) { defer func() { _historyCache = nil }() - _datastore.DbLock.Lock() - defer _datastore.DbLock.Unlock() - tx, err := _datastore.DB.Begin() if err != nil { log.Errorln("error saving", eventType, err) @@ -53,7 +55,7 @@ func saveEvent(id string, userID string, body string, eventType string, hidden * defer tx.Rollback() // nolint - stmt, err := tx.Prepare("INSERT INTO messages(id, user_id, body, eventType, hidden_at, timestamp) values(?, ?, ?, ?, ?, ?)") + stmt, err := tx.Prepare("INSERT INTO messages(id, user_id, body, eventType, hidden_at, timestamp, image, link, title, subtitle) values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)") if err != nil { log.Errorln("error saving", eventType, err) return @@ -61,7 +63,7 @@ func saveEvent(id string, userID string, body string, eventType string, hidden * defer stmt.Close() - if _, err = stmt.Exec(id, userID, body, eventType, hidden, timestamp); err != nil { + if _, err = stmt.Exec(id, userID, body, eventType, hidden, timestamp, image, link, title, subtitle); err != nil { log.Errorln("error saving", eventType, err) return } @@ -71,8 +73,134 @@ func saveEvent(id string, userID string, body string, eventType string, hidden * } } -func getChat(query string) []events.UserMessageEvent { - history := make([]events.UserMessageEvent, 0) +func makeUserMessageEventFromRowData(row rowData) events.UserMessageEvent { + scopes := "" + if row.userScopes != nil { + scopes = *row.userScopes + } + + previousUsernames := []string{} + if row.previousUsernames != nil { + previousUsernames = strings.Split(*row.previousUsernames, ",") + } + + displayName := "" + if row.userDisplayName != nil { + displayName = *row.userDisplayName + } + + displayColor := 0 + if row.userDisplayColor != nil { + displayColor = *row.userDisplayColor + } + + createdAt := time.Time{} + if row.userCreatedAt != nil { + createdAt = *row.userCreatedAt + } + + u := user.User{ + ID: *row.userID, + AccessToken: "", + DisplayName: displayName, + DisplayColor: displayColor, + CreatedAt: createdAt, + DisabledAt: row.userDisabledAt, + NameChangedAt: row.userNameChangedAt, + PreviousNames: previousUsernames, + Scopes: strings.Split(scopes, ","), + } + + message := events.UserMessageEvent{ + Event: events.Event{ + Type: row.eventType, + ID: row.id, + Timestamp: row.timestamp, + }, + UserEvent: events.UserEvent{ + User: &u, + HiddenAt: row.hiddenAt, + }, + MessageEvent: events.MessageEvent{ + Body: row.body, + RawBody: row.body, + }, + } + + return message +} + +func makeSystemMessageChatEventFromRowData(row rowData) events.SystemMessageEvent { + message := events.SystemMessageEvent{ + Event: events.Event{ + Type: row.eventType, + ID: row.id, + Timestamp: row.timestamp, + }, + MessageEvent: events.MessageEvent{ + Body: row.body, + RawBody: row.body, + }, + } + return message +} + +func makeActionMessageChatEventFromRowData(row rowData) events.ActionEvent { + message := events.ActionEvent{ + Event: events.Event{ + Type: row.eventType, + ID: row.id, + Timestamp: row.timestamp, + }, + MessageEvent: events.MessageEvent{ + Body: row.body, + RawBody: row.body, + }, + } + return message +} + +func makeFederatedActionChatEventFromRowData(row rowData) events.FediverseEngagementEvent { + message := events.FediverseEngagementEvent{ + Event: events.Event{ + Type: row.eventType, + ID: row.id, + Timestamp: row.timestamp, + }, + MessageEvent: events.MessageEvent{ + Body: row.body, + RawBody: row.body, + }, + Image: row.image, + Link: *row.link, + UserAccountName: *row.title, + } + return message +} + +type rowData struct { + id string + userID *string + body string + eventType models.EventType + hiddenAt *time.Time + timestamp time.Time + title *string + subtitle *string + image *string + link *string + + userDisplayName *string + userDisplayColor *int + userCreatedAt *time.Time + userDisabledAt *time.Time + previousUsernames *string + userNameChangedAt *time.Time + userScopes *string +} + +func getChat(query string) []interface{} { + history := make([]interface{}, 0) rows, err := _datastore.DB.Query(query) if err != nil || rows.Err() != nil { log.Errorln("error fetching chat history", err) @@ -81,69 +209,47 @@ func getChat(query string) []events.UserMessageEvent { defer rows.Close() for rows.Next() { - var id string - var userID string - var body string - var messageType models.EventType - var hiddenAt *time.Time - var timestamp time.Time - - var userDisplayName *string - var userDisplayColor *int - var userCreatedAt *time.Time - var userDisabledAt *time.Time - var previousUsernames *string - var userNameChangedAt *time.Time + row := rowData{} // Convert a database row into a chat event - err = rows.Scan(&id, &userID, &body, &messageType, &hiddenAt, ×tamp, &userDisplayName, &userDisplayColor, &userCreatedAt, &userDisabledAt, &previousUsernames, &userNameChangedAt) - if err != nil { + if err = rows.Scan( + &row.id, + &row.userID, + &row.body, + &row.title, + &row.subtitle, + &row.image, + &row.link, + &row.eventType, + &row.hiddenAt, + &row.timestamp, + &row.userDisplayName, + &row.userDisplayColor, + &row.userCreatedAt, + &row.userDisabledAt, + &row.previousUsernames, + &row.userNameChangedAt, + &row.userScopes, + ); err != nil { log.Errorln("There is a problem converting query to chat objects. Please report this:", query) break } - // System messages and chat actions are special and are not from real users - if messageType == events.SystemMessageSent || messageType == events.ChatActionSent { - name := "Owncast" - userDisplayName = &name - color := 200 - userDisplayColor = &color - } + var message interface{} - if previousUsernames == nil { - previousUsernames = userDisplayName - } - - if userCreatedAt == nil { - now := time.Now() - userCreatedAt = &now - } - - user := user.User{ - ID: userID, - AccessToken: "", - DisplayName: *userDisplayName, - DisplayColor: *userDisplayColor, - CreatedAt: *userCreatedAt, - DisabledAt: userDisabledAt, - NameChangedAt: userNameChangedAt, - PreviousNames: strings.Split(*previousUsernames, ","), - } - - message := events.UserMessageEvent{ - Event: events.Event{ - Type: messageType, - ID: id, - Timestamp: timestamp, - }, - UserEvent: events.UserEvent{ - User: &user, - HiddenAt: hiddenAt, - }, - MessageEvent: events.MessageEvent{ - Body: body, - RawBody: body, - }, + switch row.eventType { + case events.MessageSent: + message = makeUserMessageEventFromRowData(row) + case events.SystemMessageSent: + message = makeSystemMessageChatEventFromRowData(row) + case events.ChatActionSent: + message = makeActionMessageChatEventFromRowData(row) + case events.FediverseEngagementFollow: + message = makeFederatedActionChatEventFromRowData(row) + case events.FediverseEngagementLike: + message = makeFederatedActionChatEventFromRowData(row) + case events.FediverseEngagementRepost: + message = makeFederatedActionChatEventFromRowData(row) } history = append(history, message) @@ -152,16 +258,16 @@ func getChat(query string) []events.UserMessageEvent { return history } -var _historyCache *[]events.UserMessageEvent +var _historyCache *[]interface{} // GetChatModerationHistory will return all the chat messages suitable for moderation purposes. -func GetChatModerationHistory() []events.UserMessageEvent { +func GetChatModerationHistory() []interface{} { if _historyCache != nil { return *_historyCache } // Get all messages regardless of visibility - query := "SELECT messages.id, user_id, body, eventType, hidden_at, timestamp, display_name, display_color, created_at, disabled_at, previous_names, namechanged_at FROM messages INNER JOIN users ON messages.user_id = users.id ORDER BY timestamp DESC" + query := "SELECT messages.id, user_id, body, title, subtitle, image, link, eventType, hidden_at, timestamp, display_name, display_color, created_at, disabled_at, previous_names, namechanged_at, scopes FROM messages INNER JOIN users ON messages.user_id = users.id ORDER BY timestamp DESC" result := getChat(query) _historyCache = &result @@ -170,9 +276,9 @@ func GetChatModerationHistory() []events.UserMessageEvent { } // GetChatHistory will return all the chat messages suitable for returning as user-facing chat history. -func GetChatHistory() []events.UserMessageEvent { +func GetChatHistory() []interface{} { // Get all visible messages - query := fmt.Sprintf("SELECT messages.id, user_id, body, eventType, hidden_at, timestamp, display_name, display_color, created_at, disabled_at, previous_names, namechanged_at FROM messages, users WHERE messages.user_id = users.id AND hidden_at IS NULL AND disabled_at IS NULL ORDER BY timestamp DESC LIMIT %d", maxBacklogNumber) + query := fmt.Sprintf("SELECT messages.id,messages.user_id, messages.body, messages.title, messages.subtitle, messages.image, messages.link, messages.eventType, messages.hidden_at, messages.timestamp, users.display_name, users.display_color, users.created_at, users.disabled_at, users.previous_names, users.namechanged_at, users.scopes FROM messages LEFT JOIN users ON messages.user_id = users.id WHERE hidden_at IS NULL AND disabled_at IS NULL ORDER BY timestamp DESC LIMIT %d", maxBacklogNumber) m := getChat(query) // Invert order of messages @@ -200,7 +306,7 @@ func SetMessageVisibilityForUserID(userID string, visible bool) error { } for _, message := range messages { - ids = append(ids, message.ID) + ids = append(ids, message.(events.Event).ID) } // Tell the clients to hide/show these messages. @@ -250,35 +356,3 @@ func saveMessageVisibility(messageIDs []string, visible bool) error { return nil } - -// Only keep recent messages so we don't keep more chat data than needed -// for privacy and efficiency reasons. -func runPruner() { - _datastore.DbLock.Lock() - defer _datastore.DbLock.Unlock() - - log.Traceln("Removing chat messages older than", maxBacklogHours, "hours") - - deleteStatement := `DELETE FROM messages WHERE timestamp <= datetime('now', 'localtime', ?)` - tx, err := _datastore.DB.Begin() - if err != nil { - log.Debugln(err) - return - } - - stmt, err := tx.Prepare(deleteStatement) - if err != nil { - log.Debugln(err) - return - } - defer stmt.Close() - - if _, err = stmt.Exec(fmt.Sprintf("-%d hours", maxBacklogHours)); err != nil { - log.Debugln(err) - return - } - if err = tx.Commit(); err != nil { - log.Debugln(err) - return - } -} diff --git a/core/chat/pruner.go b/core/chat/pruner.go new file mode 100644 index 000000000..a9281b3df --- /dev/null +++ b/core/chat/pruner.go @@ -0,0 +1,39 @@ +package chat + +import ( + "fmt" + + log "github.com/sirupsen/logrus" +) + +// Only keep recent messages so we don't keep more chat data than needed +// for privacy and efficiency reasons. +func runPruner() { + _datastore.DbLock.Lock() + defer _datastore.DbLock.Unlock() + + log.Traceln("Removing chat messages older than", maxBacklogHours, "hours") + + deleteStatement := `DELETE FROM messages WHERE timestamp <= datetime('now', 'localtime', ?)` + tx, err := _datastore.DB.Begin() + if err != nil { + log.Debugln(err) + return + } + + stmt, err := tx.Prepare(deleteStatement) + if err != nil { + log.Debugln(err) + return + } + defer stmt.Close() + + if _, err = stmt.Exec(fmt.Sprintf("-%d hours", maxBacklogHours)); err != nil { + log.Debugln(err) + return + } + if err = tx.Commit(); err != nil { + log.Debugln(err) + return + } +} diff --git a/core/data/activitypub.go b/core/data/activitypub.go new file mode 100644 index 000000000..fd5310b38 --- /dev/null +++ b/core/data/activitypub.go @@ -0,0 +1,13 @@ +package data + +// GetFederatedInboxMap is a mapping between account names and their outbox. +func GetFederatedInboxMap() map[string]string { + return map[string]string{ + GetDefaultFederationUsername(): GetDefaultFederationUsername(), + } +} + +// GetDefaultFederationUsername will return the username used for sending federation activities. +func GetDefaultFederationUsername() string { + return GetFederationUsername() +} diff --git a/core/data/config.go b/core/data/config.go index 0eaa0f57d..cc5083743 100644 --- a/core/data/config.go +++ b/core/data/config.go @@ -13,36 +13,47 @@ import ( log "github.com/sirupsen/logrus" ) -const extraContentKey = "extra_page_content" -const streamTitleKey = "stream_title" -const streamKeyKey = "stream_key" -const logoPathKey = "logo_path" -const serverSummaryKey = "server_summary" -const serverWelcomeMessageKey = "server_welcome_message" -const serverNameKey = "server_name" -const serverURLKey = "server_url" -const httpPortNumberKey = "http_port_number" -const httpListenAddressKey = "http_listen_address" -const rtmpPortNumberKey = "rtmp_port_number" -const serverMetadataTagsKey = "server_metadata_tags" -const directoryEnabledKey = "directory_enabled" -const directoryRegistrationKeyKey = "directory_registration_key" -const socialHandlesKey = "social_handles" -const peakViewersSessionKey = "peak_viewers_session" -const peakViewersOverallKey = "peak_viewers_overall" -const lastDisconnectTimeKey = "last_disconnect_time" -const ffmpegPathKey = "ffmpeg_path" -const nsfwKey = "nsfw" -const s3StorageEnabledKey = "s3_storage_enabled" -const s3StorageConfigKey = "s3_storage_config" -const videoLatencyLevel = "video_latency_level" -const videoStreamOutputVariantsKey = "video_stream_output_variants" -const chatDisabledKey = "chat_disabled" -const externalActionsKey = "external_actions" -const customStylesKey = "custom_styles" -const videoCodecKey = "video_codec" -const blockedUsernamesKey = "blocked_usernames" -const suggestedUsernamesKey = "suggested_usernames" +const ( + extraContentKey = "extra_page_content" + streamTitleKey = "stream_title" + streamKeyKey = "stream_key" + logoPathKey = "logo_path" + serverSummaryKey = "server_summary" + serverWelcomeMessageKey = "server_welcome_message" + serverNameKey = "server_name" + serverURLKey = "server_url" + httpPortNumberKey = "http_port_number" + httpListenAddressKey = "http_listen_address" + rtmpPortNumberKey = "rtmp_port_number" + serverMetadataTagsKey = "server_metadata_tags" + directoryEnabledKey = "directory_enabled" + directoryRegistrationKeyKey = "directory_registration_key" + socialHandlesKey = "social_handles" + peakViewersSessionKey = "peak_viewers_session" + peakViewersOverallKey = "peak_viewers_overall" + lastDisconnectTimeKey = "last_disconnect_time" + ffmpegPathKey = "ffmpeg_path" + nsfwKey = "nsfw" + s3StorageEnabledKey = "s3_storage_enabled" + s3StorageConfigKey = "s3_storage_config" + videoLatencyLevel = "video_latency_level" + videoStreamOutputVariantsKey = "video_stream_output_variants" + chatDisabledKey = "chat_disabled" + externalActionsKey = "external_actions" + customStylesKey = "custom_styles" + videoCodecKey = "video_codec" + blockedUsernamesKey = "blocked_usernames" + publicKeyKey = "public_key" + privateKeyKey = "private_key" + serverInitDateKey = "server_init_date" + federationEnabledKey = "federation_enabled" + federationUsernameKey = "federation_username" + federationPrivateKey = "federation_private" + federationGoLiveMessageKey = "federation_go_live_message" + federationShowEngagementKey = "federation_show_engagement" + federationBlockedDomainsKey = "federation_blocked_domains" + suggestedUsernamesKey = "suggested_usernames" +) // GetExtraPageBodyContent will return the user-supplied body content. func GetExtraPageBodyContent() string { @@ -295,7 +306,7 @@ func GetSocialHandles() []models.SocialHandle { // SetSocialHandles will set the external social links. func SetSocialHandles(socialHandles []models.SocialHandle) error { - var configEntry = ConfigEntry{Key: socialHandlesKey, Value: socialHandles} + configEntry := ConfigEntry{Key: socialHandlesKey, Value: socialHandles} return _datastore.Save(configEntry) } @@ -350,7 +361,7 @@ func GetLastDisconnectTime() (*utils.NullTime, error) { // SetLastDisconnectTime will set the time the last stream ended. func SetLastDisconnectTime(disconnectTime time.Time) error { savedDisconnectTime := utils.NullTime{Time: disconnectTime, Valid: true} - var configEntry = ConfigEntry{Key: lastDisconnectTimeKey, Value: savedDisconnectTime} + configEntry := ConfigEntry{Key: lastDisconnectTimeKey, Value: savedDisconnectTime} return _datastore.Save(configEntry) } @@ -399,7 +410,7 @@ func GetS3Config() models.S3 { // SetS3Config will set the external storage configuration. func SetS3Config(config models.S3) error { - var configEntry = ConfigEntry{Key: s3StorageConfigKey, Value: config} + configEntry := ConfigEntry{Key: s3StorageConfigKey, Value: config} return _datastore.Save(configEntry) } @@ -457,7 +468,7 @@ func GetStreamOutputVariants() []models.StreamOutputVariant { // SetStreamOutputVariants will set the stream output variants. func SetStreamOutputVariants(variants []models.StreamOutputVariant) error { - var configEntry = ConfigEntry{Key: videoStreamOutputVariantsKey, Value: variants} + configEntry := ConfigEntry{Key: videoStreamOutputVariantsKey, Value: variants} return _datastore.Save(configEntry) } @@ -493,7 +504,7 @@ func GetExternalActions() []models.ExternalAction { // SetExternalActions will save external actions. func SetExternalActions(actions []models.ExternalAction) error { - var configEntry = ConfigEntry{Key: externalActionsKey, Value: actions} + configEntry := ConfigEntry{Key: externalActionsKey, Value: actions} return _datastore.Save(configEntry) } @@ -583,7 +594,6 @@ func FindHighestVideoQualityIndex(qualities []models.StreamOutputVariant) int { // GetForbiddenUsernameList will return the blocked usernames as a comma separated string. func GetForbiddenUsernameList() []string { usernameString, err := _datastore.GetString(blockedUsernamesKey) - if err != nil { return config.DefaultForbiddenUsernames } @@ -622,3 +632,125 @@ func SetSuggestedUsernamesList(usernames []string) error { usernameListString := strings.Join(usernames, ",") return _datastore.SetString(suggestedUsernamesKey, usernameListString) } + +// GetServerInitTime will return when the server was first setup. +func GetServerInitTime() (*utils.NullTime, error) { + var t utils.NullTime + + configEntry, err := _datastore.Get(serverInitDateKey) + if err != nil { + return nil, err + } + + if err := configEntry.getObject(&t); err != nil { + return nil, err + } + + if !t.Valid { + return nil, err + } + + return &t, nil +} + +// SetServerInitTime will set when the server was first created. +func SetServerInitTime(t time.Time) error { + nt := utils.NullTime{Time: t, Valid: true} + configEntry := ConfigEntry{Key: serverInitDateKey, Value: nt} + return _datastore.Save(configEntry) +} + +// SetFederationEnabled will enable federation if set to true. +func SetFederationEnabled(enabled bool) error { + return _datastore.SetBool(federationEnabledKey, enabled) +} + +// GetFederationEnabled will return if federation is enabled. +func GetFederationEnabled() bool { + enabled, err := _datastore.GetBool(federationEnabledKey) + if err == nil { + return enabled + } + + return false +} + +// SetFederationUsername will set the username used in federated activities. +func SetFederationUsername(username string) error { + return _datastore.SetString(federationUsernameKey, username) +} + +// GetFederationUsername will return the username used in federated activities. +func GetFederationUsername() string { + username, err := _datastore.GetString(federationUsernameKey) + if username == "" || err != nil { + return config.GetDefaults().FederationUsername + } + + return username +} + +// SetFederationGoLiveMessage will set the message sent when going live. +func SetFederationGoLiveMessage(message string) error { + return _datastore.SetString(federationGoLiveMessageKey, message) +} + +// GetFederationGoLiveMessage will return the message sent when going live. +func GetFederationGoLiveMessage() string { + // Empty message means it's disabled. + message, err := _datastore.GetString(federationGoLiveMessageKey) + if err != nil { + log.Traceln("unable to fetch go live message.", err) + } + + return message +} + +// SetFederationIsPrivate will set if federation activity is private. +func SetFederationIsPrivate(isPrivate bool) error { + return _datastore.SetBool(federationPrivateKey, isPrivate) +} + +// GetFederationIsPrivate will return if federation is private. +func GetFederationIsPrivate() bool { + isPrivate, err := _datastore.GetBool(federationPrivateKey) + if err == nil { + return isPrivate + } + + return false +} + +// SetFederationShowEngagement will set if fediverse engagement shows in chat. +func SetFederationShowEngagement(showEngagement bool) error { + return _datastore.SetBool(federationShowEngagementKey, showEngagement) +} + +// GetFederationShowEngagement will return if fediverse engagement shows in chat. +func GetFederationShowEngagement() bool { + showEngagement, err := _datastore.GetBool(federationShowEngagementKey) + if err == nil { + return showEngagement + } + + return true +} + +// SetBlockedFederatedDomains will set the blocked federated domains. +func SetBlockedFederatedDomains(domains []string) error { + return _datastore.SetString(federationBlockedDomainsKey, strings.Join(domains, ",")) +} + +// GetBlockedFederatedDomains will return a list of blocked federated domains. +func GetBlockedFederatedDomains() []string { + domains, err := _datastore.GetString(federationBlockedDomainsKey) + if err != nil { + return []string{} + } + + if domains == "" { + return []string{} + } + + return strings.Split(domains, ",") +} diff --git a/core/data/crypto.go b/core/data/crypto.go new file mode 100644 index 000000000..8bd394a8a --- /dev/null +++ b/core/data/crypto.go @@ -0,0 +1,23 @@ +package data + +// GetPublicKey will return the public key. +func GetPublicKey() string { + value, _ := _datastore.GetString(publicKeyKey) + return value +} + +// SetPublicKey will save the public key. +func SetPublicKey(key string) error { + return _datastore.SetString(publicKeyKey, key) +} + +// GetPrivateKey will return the private key. +func GetPrivateKey() string { + value, _ := _datastore.GetString(privateKeyKey) + return value +} + +// SetPrivateKey will save the private key. +func SetPrivateKey(key string) error { + return _datastore.SetString(privateKeyKey, key) +} diff --git a/core/data/data.go b/core/data/data.go index bf5086316..f233d542c 100644 --- a/core/data/data.go +++ b/core/data/data.go @@ -17,11 +17,13 @@ import ( ) const ( - schemaVersion = 1 + schemaVersion = 3 ) -var _db *sql.DB -var _datastore *Datastore +var ( + _db *sql.DB + _datastore *Datastore +) // GetDatabase will return the shared instance of the actual database. func GetDatabase() *sql.DB { @@ -35,18 +37,34 @@ func GetStore() *Datastore { // SetupPersistence will open the datastore and make it available. func SetupPersistence(file string) error { - // Create empty DB file if it doesn't exist. - if !utils.DoesFileExists(file) { - log.Traceln("Creating new database at", file) + // Allow support for in-memory databases for tests. - _, err := os.Create(file) + var db *sql.DB + + if file == ":memory:" { + inMemoryDb, err := sql.Open("sqlite3", file) if err != nil { log.Fatal(err.Error()) } - } + db = inMemoryDb + } else { + // Create empty DB file if it doesn't exist. + if !utils.DoesFileExists(file) { + log.Traceln("Creating new database at", file) - db, err := sql.Open("sqlite3", fmt.Sprintf("file:%s?_cache_size=10000&cache=shared&_journal_mode=WAL", file)) - db.SetMaxOpenConns(1) + _, err := os.Create(file) + if err != nil { + log.Fatal(err.Error()) + } + } + + onDiskDb, err := sql.Open("sqlite3", fmt.Sprintf("file:%s?_cache_size=10000&cache=shared&_journal_mode=WAL", file)) + if err != nil { + return err + } + db = onDiskDb + db.SetMaxOpenConns(1) + } _db = db // Some SQLite optimizations @@ -58,10 +76,6 @@ func SetupPersistence(file string) error { createWebhooksTable() createUsersTable(db) - if err != nil { - return err - } - if _, err := db.Exec(`CREATE TABLE IF NOT EXISTS config ( "key" string NOT NULL PRIMARY KEY, "value" TEXT @@ -70,7 +84,7 @@ func SetupPersistence(file string) error { } var version int - err = db.QueryRow("SELECT value FROM config WHERE key='version'"). + err := db.QueryRow("SELECT value FROM config WHERE key='version'"). Scan(&version) if err != nil { if err != sql.ErrNoRows { @@ -117,10 +131,14 @@ func migrateDatabase(db *sql.DB, from, to int) error { dbBackupFile := filepath.Join(config.BackupDirectory, fmt.Sprintf("owncast-v%d.bak", from)) utils.Backup(db, dbBackupFile) for v := from; v < to; v++ { + log.Tracef("Migration step from %d to %d\n", v, v+1) switch v { case 0: - log.Tracef("Migration step from %d to %d\n", v, v+1) migrateToSchema1(db) + case 1: + migrateToSchema2(db) + case 2: + migrateToSchema3(db) default: log.Fatalln("missing database migration step") } diff --git a/core/data/defaults.go b/core/data/defaults.go index ad1810ecb..7ecf868ad 100644 --- a/core/data/defaults.go +++ b/core/data/defaults.go @@ -14,6 +14,14 @@ func HasPopulatedDefaults() bool { return hasPopulated } +func hasPopulatedFederationDefaults() bool { + hasPopulated, err := _datastore.GetBool("HAS_POPULATED_FEDERATION_DEFAULTS") + if err != nil { + return false + } + return hasPopulated +} + // PopulateDefaults will set default values in the database. func PopulateDefaults() { defaults := config.GetDefaults() @@ -32,6 +40,7 @@ func PopulateDefaults() { _ = SetServerName("Owncast") _ = SetStreamKey(defaults.StreamKey) _ = SetExtraPageBodyContent("This is your page's content that can be edited in the admin.") + _ = SetFederationGoLiveMessage(defaults.FederationGoLiveMessage) _ = SetSocialHandles([]models.SocialHandle{ { Platform: "github", diff --git a/core/data/messages.go b/core/data/messages.go index efb9d49e9..163fd4cd2 100644 --- a/core/data/messages.go +++ b/core/data/messages.go @@ -15,6 +15,10 @@ func CreateMessagesTable(db *sql.DB) { "eventType" TEXT, "hidden_at" DATETIME, "timestamp" DATETIME, + "title" TEXT, + "subtitle" TEXT, + "image" TEXT, + "link" TEXT, PRIMARY KEY (id) );CREATE INDEX index ON messages (id, user_id, hidden_at, timestamp); CREATE INDEX id ON messages (id); diff --git a/core/data/migrations.go b/core/data/migrations.go index 270d5f3d5..178e5fba2 100644 --- a/core/data/migrations.go +++ b/core/data/migrations.go @@ -9,6 +9,44 @@ import ( "github.com/teris-io/shortid" ) +func migrateToSchema3(db *sql.DB) { + // Since it's just a backlog of chat messages let's wipe the old messages + // and recreate the table. + + // Drop the old messages table + stmt, err := db.Prepare("DROP TABLE messages") + if err != nil { + log.Fatal(err) + } + defer stmt.Close() + _, err = stmt.Exec() + if err != nil { + log.Warnln(err) + } + + // Recreate it + CreateMessagesTable(db) +} + +func migrateToSchema2(db *sql.DB) { + // Since it's just a backlog of chat messages let's wipe the old messages + // and recreate the table. + + // Drop the old messages table + stmt, err := db.Prepare("DROP TABLE messages") + if err != nil { + log.Fatal(err) + } + defer stmt.Close() + _, err = stmt.Exec() + if err != nil { + log.Warnln(err) + } + + // Recreate it + CreateMessagesTable(db) +} + func migrateToSchema1(db *sql.DB) { // Since it's just a backlog of chat messages let's wipe the old messages // and recreate the table. @@ -100,7 +138,6 @@ func insertAPIToken(db *sql.DB, token string, name string, color int, scopes str return err } stmt, err := tx.Prepare("INSERT INTO users(id, access_token, display_name, display_color, scopes, type) values(?, ?, ?, ?, ?, ?)") - if err != nil { return err } diff --git a/core/data/persistence.go b/core/data/persistence.go index d1a6c9432..f8bf57bbe 100644 --- a/core/data/persistence.go +++ b/core/data/persistence.go @@ -5,9 +5,12 @@ import ( "database/sql" "encoding/gob" "sync" + "time" // sqlite requires a blank import. _ "github.com/mattn/go-sqlite3" + "github.com/owncast/owncast/config" + "github.com/owncast/owncast/db" log "github.com/sirupsen/logrus" ) @@ -37,6 +40,11 @@ func (ds *Datastore) warmCache() { } } +// GetQueries will return the shared instance of the SQL query generator. +func (ds *Datastore) GetQueries() *db.Queries { + return db.New(ds.DB) +} + // Get will query the database for the key and return the entry. func (ds *Datastore) Get(key string) (ConfigEntry, error) { cachedValue, err := ds.GetCachedValue(key) @@ -125,6 +133,20 @@ func (ds *Datastore) Setup() { if !HasPopulatedDefaults() { PopulateDefaults() } + + if !hasPopulatedFederationDefaults() { + if err := SetFederationGoLiveMessage(config.GetDefaults().FederationGoLiveMessage); err != nil { + log.Errorln(err) + } + if err := _datastore.SetBool("HAS_POPULATED_FEDERATION_DEFAULTS", true); err != nil { + log.Errorln(err) + } + } + + // Set the server initialization date if needed. + if hasSetInitDate, _ := GetServerInitTime(); hasSetInitDate == nil || !hasSetInitDate.Valid { + _ = SetServerInitTime(time.Now()) + } } // Reset will delete all config entries in the datastore and start over. diff --git a/core/data/users.go b/core/data/users.go index 65a45b689..dc5f1ad17 100644 --- a/core/data/users.go +++ b/core/data/users.go @@ -24,6 +24,7 @@ func createUsersTable(db *sql.DB) { PRIMARY KEY (id) );CREATE INDEX index ON users (id, access_token, disabled_at); CREATE INDEX id ON users (id); + CREATE INDEX id_disabled ON users (id, disabled_at); CREATE INDEX access_token ON users (access_token); CREATE INDEX disabled_at ON USERS (disabled_at);` diff --git a/core/streamState.go b/core/streamState.go index 4b4d32db3..f4184f7d2 100644 --- a/core/streamState.go +++ b/core/streamState.go @@ -1,11 +1,13 @@ package core import ( + "context" "io" "time" log "github.com/sirupsen/logrus" + "github.com/owncast/owncast/activitypub" "github.com/owncast/owncast/config" "github.com/owncast/owncast/core/chat" "github.com/owncast/owncast/core/data" @@ -24,6 +26,8 @@ var _onlineCleanupTicker *time.Ticker var _currentBroadcast *models.CurrentBroadcast +var _onlineTimerCancelFunc context.CancelFunc + // setStreamAsConnected sets the stream as connected. func setStreamAsConnected(rtmpOut *io.PipeReader) { now := utils.NullTime{Time: time.Now(), Valid: true} @@ -66,6 +70,11 @@ func setStreamAsConnected(rtmpOut *io.PipeReader) { _ = chat.SendSystemAction("Stay tuned, the stream is **starting**!", true) chat.SendAllWelcomeMessage() + + // Send a delayed live Federated message. + if data.GetFederationEnabled() { + _onlineTimerCancelFunc = startFederatedLiveStreamMessageTimer() + } } // SetStreamAsDisconnected sets the stream as disconnected. @@ -73,6 +82,10 @@ func SetStreamAsDisconnected() { _ = chat.SendSystemAction("The stream is ending.", true) now := utils.NullTime{Time: time.Now(), Valid: true} + if _onlineTimerCancelFunc != nil { + _onlineTimerCancelFunc() + } + _stats.StreamConnected = false _stats.LastDisconnectTime = &now _stats.LastConnectTime = nil @@ -147,3 +160,20 @@ func stopOnlineCleanupTimer() { _onlineCleanupTicker.Stop() } } + +func startFederatedLiveStreamMessageTimer() context.CancelFunc { + // Send a delayed live Federated message. + c, cancelFunc := context.WithCancel(context.Background()) + _onlineTimerCancelFunc = cancelFunc + go func(c context.Context) { + select { + case <-time.After(time.Minute * 2.0): + log.Traceln("Sending Federated Go Live message.") + if err := activitypub.SendLive(); err != nil { + log.Errorln(err) + } + case <-c.Done(): + } + }(c) + return cancelFunc +} diff --git a/core/transcoder/thumbnailGenerator.go b/core/transcoder/thumbnailGenerator.go index ce1e01bfd..faa0c6a8b 100644 --- a/core/transcoder/thumbnailGenerator.go +++ b/core/transcoder/thumbnailGenerator.go @@ -110,10 +110,7 @@ func fireThumbnailGenerator(segmentPath string, variantIndex int) error { log.Errorln(err) } - // If YP support is enabled also create an animated GIF preview - if data.GetDirectoryEnabled() { - makeAnimatedGifPreview(mostRecentFile, previewGifFile) - } + makeAnimatedGifPreview(mostRecentFile, previewGifFile) return nil } diff --git a/db/README.md b/db/README.md new file mode 100644 index 000000000..331fb3313 --- /dev/null +++ b/db/README.md @@ -0,0 +1,28 @@ +# SQL Queries + +sqlc generates **type-safe code** from SQL. Here's how it works: + +1. You define the schema in `schema.sql`. +1. You write your queries in `query.sql` using regular SQL. +1. You run `sqlc generate` to generate Go code with type-safe interfaces to those queries. +1. You write application code that calls the generated code. + +Only those who need to create or update SQL queries will need to have `sqlc` installed on their system. **It is not a dependency required to build the codebase.** + +## Install sqlc + +### Snap + +`sudo snap install sqlc` + +### Go install + +`go install github.com/kyleconroy/sqlc/cmd/sqlc@latest` + +### macOS + +`brew install sqlc` + +### Download a release + +VisitWhat is your stream about today?
What is your stream about today?