2021-05-08 15:25:55 +03:00
/ *
GoToSocial
Copyright ( C ) 2021 GoToSocial Authors admin @ gotosocial . org
This program is free software : you can redistribute it and / or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation , either version 3 of the License , or
( at your option ) any later version .
This program is distributed in the hope that it will be useful ,
but WITHOUT ANY WARRANTY ; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE . See the
GNU Affero General Public License for more details .
You should have received a copy of the GNU Affero General Public License
along with this program . If not , see < http : //www.gnu.org/licenses/>.
* /
2021-05-30 14:12:00 +03:00
package processing
2021-05-08 15:25:55 +03:00
import (
2021-05-15 12:58:11 +03:00
"context"
2021-05-08 15:25:55 +03:00
"net/http"
2021-07-05 14:23:03 +03:00
"net/url"
2021-05-08 15:25:55 +03:00
"github.com/sirupsen/logrus"
apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model"
2021-05-30 14:12:00 +03:00
"github.com/superseriousbusiness/gotosocial/internal/blob"
2021-05-08 15:25:55 +03:00
"github.com/superseriousbusiness/gotosocial/internal/config"
"github.com/superseriousbusiness/gotosocial/internal/db"
"github.com/superseriousbusiness/gotosocial/internal/federation"
2021-06-13 19:42:28 +03:00
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
2021-05-08 15:25:55 +03:00
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
"github.com/superseriousbusiness/gotosocial/internal/media"
"github.com/superseriousbusiness/gotosocial/internal/oauth"
2021-07-05 14:23:03 +03:00
"github.com/superseriousbusiness/gotosocial/internal/processing/account"
"github.com/superseriousbusiness/gotosocial/internal/processing/admin"
mediaProcessor "github.com/superseriousbusiness/gotosocial/internal/processing/media"
"github.com/superseriousbusiness/gotosocial/internal/processing/status"
"github.com/superseriousbusiness/gotosocial/internal/processing/streaming"
2021-06-13 19:42:28 +03:00
"github.com/superseriousbusiness/gotosocial/internal/timeline"
2021-05-08 15:25:55 +03:00
"github.com/superseriousbusiness/gotosocial/internal/typeutils"
2021-06-17 19:02:33 +03:00
"github.com/superseriousbusiness/gotosocial/internal/visibility"
2021-05-08 15:25:55 +03:00
)
// Processor should be passed to api modules (see internal/apimodule/...). It is used for
// passing messages back and forth from the client API and the federating interface, via channels.
// It also contains logic for filtering which messages should end up where.
// It is designed to be used asynchronously: the client API and the federating API should just be able to
// fire messages into the processor and not wait for a reply before proceeding with other work. This allows
// for clean distribution of messages without slowing down the client API and harming the user experience.
type Processor interface {
// Start starts the Processor, reading from its channels and passing messages back and forth.
2021-08-25 16:34:33 +03:00
Start ( ctx context . Context ) error
2021-05-08 15:25:55 +03:00
// Stop stops the processor cleanly, finishing handling any remaining messages before closing down.
Stop ( ) error
/ *
CLIENT API - FACING PROCESSING FUNCTIONS
These functions are intended to be called when the API client needs an immediate ( ie . , synchronous ) reply
to an HTTP request . As such , they will only do the bare - minimum of work necessary to give a properly
formed reply . For more intensive ( and time - consuming ) calls , where you don ' t require an immediate
response , pass work to the processor using a channel instead .
* /
// AccountCreate processes the given form for creating a new account, returning an oauth token for that account if successful.
2021-08-25 16:34:33 +03:00
AccountCreate ( ctx context . Context , authed * oauth . Auth , form * apimodel . AccountCreateRequest ) ( * apimodel . Token , error )
2021-05-08 15:25:55 +03:00
// AccountGet processes the given request for account information.
2021-08-25 16:34:33 +03:00
AccountGet ( ctx context . Context , authed * oauth . Auth , targetAccountID string ) ( * apimodel . Account , error )
2021-05-08 15:25:55 +03:00
// AccountUpdate processes the update of an account with the given form
2021-08-25 16:34:33 +03:00
AccountUpdate ( ctx context . Context , authed * oauth . Auth , form * apimodel . UpdateCredentialsRequest ) ( * apimodel . Account , error )
2021-05-17 20:06:58 +03:00
// AccountStatusesGet fetches a number of statuses (in time descending order) from the given account, filtered by visibility for
// the account given in authed.
2021-08-25 16:34:33 +03:00
AccountStatusesGet ( ctx context . Context , authed * oauth . Auth , targetAccountID string , limit int , excludeReplies bool , maxID string , pinned bool , mediaOnly bool ) ( [ ] apimodel . Status , gtserror . WithCode )
2021-05-21 16:48:26 +03:00
// AccountFollowersGet fetches a list of the target account's followers.
2021-08-25 16:34:33 +03:00
AccountFollowersGet ( ctx context . Context , authed * oauth . Auth , targetAccountID string ) ( [ ] apimodel . Account , gtserror . WithCode )
2021-05-21 16:48:26 +03:00
// AccountFollowingGet fetches a list of the accounts that target account is following.
2021-08-25 16:34:33 +03:00
AccountFollowingGet ( ctx context . Context , authed * oauth . Auth , targetAccountID string ) ( [ ] apimodel . Account , gtserror . WithCode )
2021-05-21 16:48:26 +03:00
// AccountRelationshipGet returns a relationship model describing the relationship of the targetAccount to the Authed account.
2021-08-25 16:34:33 +03:00
AccountRelationshipGet ( ctx context . Context , authed * oauth . Auth , targetAccountID string ) ( * apimodel . Relationship , gtserror . WithCode )
2021-05-21 16:48:26 +03:00
// AccountFollowCreate handles a follow request to an account, either remote or local.
2021-08-25 16:34:33 +03:00
AccountFollowCreate ( ctx context . Context , authed * oauth . Auth , form * apimodel . AccountFollowRequest ) ( * apimodel . Relationship , gtserror . WithCode )
2021-05-21 16:48:26 +03:00
// AccountFollowRemove handles the removal of a follow/follow request to an account, either remote or local.
2021-08-25 16:34:33 +03:00
AccountFollowRemove ( ctx context . Context , authed * oauth . Auth , targetAccountID string ) ( * apimodel . Relationship , gtserror . WithCode )
2021-07-11 17:22:21 +03:00
// AccountBlockCreate handles the creation of a block from authed account to target account, either remote or local.
2021-08-25 16:34:33 +03:00
AccountBlockCreate ( ctx context . Context , authed * oauth . Auth , targetAccountID string ) ( * apimodel . Relationship , gtserror . WithCode )
2021-07-11 17:22:21 +03:00
// AccountBlockRemove handles the removal of a block from authed account to target account, either remote or local.
2021-08-25 16:34:33 +03:00
AccountBlockRemove ( ctx context . Context , authed * oauth . Auth , targetAccountID string ) ( * apimodel . Relationship , gtserror . WithCode )
2021-05-08 15:25:55 +03:00
2021-05-09 15:06:06 +03:00
// AdminEmojiCreate handles the creation of a new instance emoji by an admin, using the given form.
2021-08-25 16:34:33 +03:00
AdminEmojiCreate ( ctx context . Context , authed * oauth . Auth , form * apimodel . EmojiCreateRequest ) ( * apimodel . Emoji , error )
2021-07-05 14:23:03 +03:00
// AdminDomainBlockCreate handles the creation of a new domain block by an admin, using the given form.
2021-08-25 16:34:33 +03:00
AdminDomainBlockCreate ( ctx context . Context , authed * oauth . Auth , form * apimodel . DomainBlockCreateRequest ) ( * apimodel . DomainBlock , gtserror . WithCode )
2021-07-06 14:29:11 +03:00
// AdminDomainBlocksImport handles the import of multiple domain blocks by an admin, using the given form.
2021-08-25 16:34:33 +03:00
AdminDomainBlocksImport ( ctx context . Context , authed * oauth . Auth , form * apimodel . DomainBlockCreateRequest ) ( [ ] * apimodel . DomainBlock , gtserror . WithCode )
2021-07-05 14:23:03 +03:00
// AdminDomainBlocksGet returns a list of currently blocked domains.
2021-08-25 16:34:33 +03:00
AdminDomainBlocksGet ( ctx context . Context , authed * oauth . Auth , export bool ) ( [ ] * apimodel . DomainBlock , gtserror . WithCode )
2021-07-05 14:23:03 +03:00
// AdminDomainBlockGet returns one domain block, specified by ID.
2021-08-25 16:34:33 +03:00
AdminDomainBlockGet ( ctx context . Context , authed * oauth . Auth , id string , export bool ) ( * apimodel . DomainBlock , gtserror . WithCode )
2021-07-05 14:23:03 +03:00
// AdminDomainBlockDelete deletes one domain block, specified by ID, returning the deleted domain block.
2021-08-25 16:34:33 +03:00
AdminDomainBlockDelete ( ctx context . Context , authed * oauth . Auth , id string ) ( * apimodel . DomainBlock , gtserror . WithCode )
2021-05-09 15:06:06 +03:00
2021-05-08 15:25:55 +03:00
// AppCreate processes the creation of a new API application
2021-08-25 16:34:33 +03:00
AppCreate ( ctx context . Context , authed * oauth . Auth , form * apimodel . ApplicationCreateRequest ) ( * apimodel . Application , error )
2021-05-08 15:25:55 +03:00
2021-07-11 17:22:21 +03:00
// BlocksGet returns a list of accounts blocked by the requesting account.
2021-08-25 16:34:33 +03:00
BlocksGet ( ctx context . Context , authed * oauth . Auth , maxID string , sinceID string , limit int ) ( * apimodel . BlocksResponse , gtserror . WithCode )
2021-07-11 17:22:21 +03:00
2021-05-10 17:29:05 +03:00
// FileGet handles the fetching of a media attachment file via the fileserver.
2021-08-25 16:34:33 +03:00
FileGet ( ctx context . Context , authed * oauth . Auth , form * apimodel . GetContentRequestForm ) ( * apimodel . Content , error )
2021-05-10 17:29:05 +03:00
2021-05-15 12:58:11 +03:00
// FollowRequestsGet handles the getting of the authed account's incoming follow requests
2021-08-25 16:34:33 +03:00
FollowRequestsGet ( ctx context . Context , auth * oauth . Auth ) ( [ ] apimodel . Account , gtserror . WithCode )
2021-05-15 12:58:11 +03:00
// FollowRequestAccept handles the acceptance of a follow request from the given account ID
2021-08-25 16:34:33 +03:00
FollowRequestAccept ( ctx context . Context , auth * oauth . Auth , accountID string ) ( * apimodel . Relationship , gtserror . WithCode )
2021-05-15 12:58:11 +03:00
2021-05-09 15:06:06 +03:00
// InstanceGet retrieves instance information for serving at api/v1/instance
2021-08-25 16:34:33 +03:00
InstanceGet ( ctx context . Context , domain string ) ( * apimodel . Instance , gtserror . WithCode )
2021-06-23 17:35:57 +03:00
// InstancePatch updates this instance according to the given form.
//
// It should already be ascertained that the requesting account is authenticated and an admin.
2021-08-25 16:34:33 +03:00
InstancePatch ( ctx context . Context , form * apimodel . InstanceSettingsUpdateRequest ) ( * apimodel . Instance , gtserror . WithCode )
2021-05-09 15:06:06 +03:00
// MediaCreate handles the creation of a media attachment, using the given form.
2021-08-25 16:34:33 +03:00
MediaCreate ( ctx context . Context , authed * oauth . Auth , form * apimodel . AttachmentRequest ) ( * apimodel . Attachment , error )
2021-05-10 17:29:05 +03:00
// MediaGet handles the GET of a media attachment with the given ID
2021-08-25 16:34:33 +03:00
MediaGet ( ctx context . Context , authed * oauth . Auth , attachmentID string ) ( * apimodel . Attachment , gtserror . WithCode )
2021-05-10 17:29:05 +03:00
// MediaUpdate handles the PUT of a media attachment with the given ID and form
2021-08-25 16:34:33 +03:00
MediaUpdate ( ctx context . Context , authed * oauth . Auth , attachmentID string , form * apimodel . AttachmentUpdateRequest ) ( * apimodel . Attachment , gtserror . WithCode )
2021-05-09 15:06:06 +03:00
2021-05-27 17:06:24 +03:00
// NotificationsGet
2021-08-25 16:34:33 +03:00
NotificationsGet ( ctx context . Context , authed * oauth . Auth , limit int , maxID string , sinceID string ) ( [ ] * apimodel . Notification , gtserror . WithCode )
2021-05-27 17:06:24 +03:00
2021-05-29 20:36:54 +03:00
// SearchGet performs a search with the given params, resolving/dereferencing remotely as desired
2021-08-25 16:34:33 +03:00
SearchGet ( ctx context . Context , authed * oauth . Auth , searchQuery * apimodel . SearchQuery ) ( * apimodel . SearchResult , gtserror . WithCode )
2021-05-29 20:36:54 +03:00
2021-05-08 15:25:55 +03:00
// StatusCreate processes the given form to create a new status, returning the api model representation of that status if it's OK.
2021-08-25 16:34:33 +03:00
StatusCreate ( ctx context . Context , authed * oauth . Auth , form * apimodel . AdvancedStatusCreateForm ) ( * apimodel . Status , error )
2021-05-08 15:25:55 +03:00
// StatusDelete processes the delete of a given status, returning the deleted status if the delete goes through.
2021-08-25 16:34:33 +03:00
StatusDelete ( ctx context . Context , authed * oauth . Auth , targetStatusID string ) ( * apimodel . Status , error )
2021-05-08 15:25:55 +03:00
// StatusFave processes the faving of a given status, returning the updated status if the fave goes through.
2021-08-25 16:34:33 +03:00
StatusFave ( ctx context . Context , authed * oauth . Auth , targetStatusID string ) ( * apimodel . Status , error )
2021-05-08 16:16:24 +03:00
// StatusBoost processes the boost/reblog of a given status, returning the newly-created boost if all is well.
2021-08-25 16:34:33 +03:00
StatusBoost ( ctx context . Context , authed * oauth . Auth , targetStatusID string ) ( * apimodel . Status , gtserror . WithCode )
2021-06-21 16:56:00 +03:00
// StatusUnboost processes the unboost/unreblog of a given status, returning the status if all is well.
2021-08-25 16:34:33 +03:00
StatusUnboost ( ctx context . Context , authed * oauth . Auth , targetStatusID string ) ( * apimodel . Status , gtserror . WithCode )
2021-05-31 18:36:35 +03:00
// StatusBoostedBy returns a slice of accounts that have boosted the given status, filtered according to privacy settings.
2021-08-25 16:34:33 +03:00
StatusBoostedBy ( ctx context . Context , authed * oauth . Auth , targetStatusID string ) ( [ ] * apimodel . Account , gtserror . WithCode )
2021-05-08 15:25:55 +03:00
// StatusFavedBy returns a slice of accounts that have liked the given status, filtered according to privacy settings.
2021-08-25 16:34:33 +03:00
StatusFavedBy ( ctx context . Context , authed * oauth . Auth , targetStatusID string ) ( [ ] * apimodel . Account , error )
2021-05-08 15:25:55 +03:00
// StatusGet gets the given status, taking account of privacy settings and blocks etc.
2021-08-25 16:34:33 +03:00
StatusGet ( ctx context . Context , authed * oauth . Auth , targetStatusID string ) ( * apimodel . Status , error )
2021-05-08 15:25:55 +03:00
// StatusUnfave processes the unfaving of a given status, returning the updated status if the fave goes through.
2021-08-25 16:34:33 +03:00
StatusUnfave ( ctx context . Context , authed * oauth . Auth , targetStatusID string ) ( * apimodel . Status , error )
2021-05-31 18:36:35 +03:00
// StatusGetContext returns the context (previous and following posts) from the given status ID
2021-08-25 16:34:33 +03:00
StatusGetContext ( ctx context . Context , authed * oauth . Auth , targetStatusID string ) ( * apimodel . Context , gtserror . WithCode )
2021-05-08 15:25:55 +03:00
2021-05-22 00:04:59 +03:00
// HomeTimelineGet returns statuses from the home timeline, with the given filters/parameters.
2021-08-25 16:34:33 +03:00
HomeTimelineGet ( ctx context . Context , authed * oauth . Auth , maxID string , sinceID string , minID string , limit int , local bool ) ( * apimodel . StatusTimelineResponse , gtserror . WithCode )
2021-05-31 18:36:35 +03:00
// PublicTimelineGet returns statuses from the public/local timeline, with the given filters/parameters.
2021-08-25 16:34:33 +03:00
PublicTimelineGet ( ctx context . Context , authed * oauth . Auth , maxID string , sinceID string , minID string , limit int , local bool ) ( * apimodel . StatusTimelineResponse , gtserror . WithCode )
2021-07-09 19:32:48 +03:00
// FavedTimelineGet returns faved statuses, with the given filters/parameters.
2021-08-25 16:34:33 +03:00
FavedTimelineGet ( ctx context . Context , authed * oauth . Auth , maxID string , minID string , limit int ) ( * apimodel . StatusTimelineResponse , gtserror . WithCode )
2021-05-22 00:04:59 +03:00
2021-06-19 12:18:55 +03:00
// AuthorizeStreamingRequest returns a gotosocial account in exchange for an access token, or an error if the given token is not valid.
2021-08-25 16:34:33 +03:00
AuthorizeStreamingRequest ( ctx context . Context , accessToken string ) ( * gtsmodel . Account , error )
2021-06-19 12:18:55 +03:00
// OpenStreamForAccount opens a new stream for the given account, with the given stream type.
2021-08-25 16:34:33 +03:00
OpenStreamForAccount ( ctx context . Context , account * gtsmodel . Account , streamType string ) ( * gtsmodel . Stream , gtserror . WithCode )
2021-06-19 12:18:55 +03:00
2021-05-08 15:25:55 +03:00
/ *
FEDERATION API - FACING PROCESSING FUNCTIONS
These functions are intended to be called when the federating client needs an immediate ( ie . , synchronous ) reply
to an HTTP request . As such , they will only do the bare - minimum of work necessary to give a properly
formed reply . For more intensive ( and time - consuming ) calls , where you don ' t require an immediate
response , pass work to the processor using a channel instead .
* /
// GetFediUser handles the getting of a fedi/activitypub representation of a user/account, performing appropriate authentication
// before returning a JSON serializable interface to the caller.
2021-07-05 14:23:03 +03:00
GetFediUser ( ctx context . Context , requestedUsername string , requestURL * url . URL ) ( interface { } , gtserror . WithCode )
2021-05-09 21:34:27 +03:00
2021-05-21 16:48:26 +03:00
// GetFediFollowers handles the getting of a fedi/activitypub representation of a user/account's followers, performing appropriate
// authentication before returning a JSON serializable interface to the caller.
2021-07-05 14:23:03 +03:00
GetFediFollowers ( ctx context . Context , requestedUsername string , requestURL * url . URL ) ( interface { } , gtserror . WithCode )
2021-05-23 19:07:04 +03:00
// GetFediFollowing handles the getting of a fedi/activitypub representation of a user/account's following, performing appropriate
// authentication before returning a JSON serializable interface to the caller.
2021-07-05 14:23:03 +03:00
GetFediFollowing ( ctx context . Context , requestedUsername string , requestURL * url . URL ) ( interface { } , gtserror . WithCode )
2021-05-21 16:48:26 +03:00
// GetFediStatus handles the getting of a fedi/activitypub representation of a particular status, performing appropriate
// authentication before returning a JSON serializable interface to the caller.
2021-07-05 14:23:03 +03:00
GetFediStatus ( ctx context . Context , requestedUsername string , requestedStatusID string , requestURL * url . URL ) ( interface { } , gtserror . WithCode )
2021-05-21 16:48:26 +03:00
2021-08-10 14:32:39 +03:00
// GetFediStatus handles the getting of a fedi/activitypub representation of replies to a status, performing appropriate
// authentication before returning a JSON serializable interface to the caller.
GetFediStatusReplies ( ctx context . Context , requestedUsername string , requestedStatusID string , page bool , onlyOtherAccounts bool , minID string , requestURL * url . URL ) ( interface { } , gtserror . WithCode )
2021-05-09 21:34:27 +03:00
// GetWebfingerAccount handles the GET for a webfinger resource. Most commonly, it will be used for returning account lookups.
2021-07-05 14:23:03 +03:00
GetWebfingerAccount ( ctx context . Context , requestedUsername string , requestURL * url . URL ) ( * apimodel . WellKnownResponse , gtserror . WithCode )
2021-06-24 15:26:08 +03:00
// GetNodeInfoRel returns a well known response giving the path to node info.
2021-08-25 16:34:33 +03:00
GetNodeInfoRel ( ctx context . Context , request * http . Request ) ( * apimodel . WellKnownResponse , gtserror . WithCode )
2021-06-24 15:26:08 +03:00
// GetNodeInfo returns a node info struct in response to a node info request.
2021-08-25 16:34:33 +03:00
GetNodeInfo ( ctx context . Context , request * http . Request ) ( * apimodel . Nodeinfo , gtserror . WithCode )
2021-05-15 12:58:11 +03:00
// InboxPost handles POST requests to a user's inbox for new activitypub messages.
//
// InboxPost returns true if the request was handled as an ActivityPub POST to an actor's inbox.
// If false, the request was not an ActivityPub request and may still be handled by the caller in another way, such as serving a web page.
//
// If the error is nil, then the ResponseWriter's headers and response has already been written. If a non-nil error is returned, then no response has been written.
//
// If the Actor was constructed with the Federated Protocol enabled, side effects will occur.
//
// If the Federated Protocol is not enabled, writes the http.StatusMethodNotAllowed status code in the response. No side effects occur.
InboxPost ( ctx context . Context , w http . ResponseWriter , r * http . Request ) ( bool , error )
2021-05-08 15:25:55 +03:00
}
// processor just implements the Processor interface
type processor struct {
2021-06-13 19:42:28 +03:00
fromClientAPI chan gtsmodel . FromClientAPI
fromFederator chan gtsmodel . FromFederator
federator federation . Federator
stop chan interface { }
log * logrus . Logger
config * config . Config
tc typeutils . TypeConverter
oauthServer oauth . Server
mediaHandler media . Handler
storage blob . Storage
timelineManager timeline . Manager
db db . DB
2021-06-17 19:02:33 +03:00
filter visibility . Filter
2021-05-08 15:25:55 +03:00
2021-06-13 19:42:28 +03:00
/ *
SUB - PROCESSORS
* /
2021-07-05 14:23:03 +03:00
accountProcessor account . Processor
adminProcessor admin . Processor
2021-06-19 12:18:55 +03:00
statusProcessor status . Processor
streamingProcessor streaming . Processor
2021-07-05 14:23:03 +03:00
mediaProcessor mediaProcessor . Processor
2021-05-08 15:25:55 +03:00
}
2021-06-13 19:42:28 +03:00
// NewProcessor returns a new Processor that uses the given federator and logger
func NewProcessor ( config * config . Config , tc typeutils . TypeConverter , federator federation . Federator , oauthServer oauth . Server , mediaHandler media . Handler , storage blob . Storage , timelineManager timeline . Manager , db db . DB , log * logrus . Logger ) Processor {
2021-05-08 15:25:55 +03:00
2021-06-13 19:42:28 +03:00
fromClientAPI := make ( chan gtsmodel . FromClientAPI , 1000 )
fromFederator := make ( chan gtsmodel . FromFederator , 1000 )
2021-05-08 15:25:55 +03:00
2021-06-13 19:42:28 +03:00
statusProcessor := status . New ( db , tc , config , fromClientAPI , log )
2021-06-19 12:18:55 +03:00
streamingProcessor := streaming . New ( db , tc , oauthServer , config , log )
2021-07-05 14:23:03 +03:00
accountProcessor := account . New ( db , tc , mediaHandler , oauthServer , fromClientAPI , federator , config , log )
adminProcessor := admin . New ( db , tc , mediaHandler , fromClientAPI , config , log )
mediaProcessor := mediaProcessor . New ( db , tc , mediaHandler , storage , config , log )
2021-05-08 15:25:55 +03:00
2021-06-13 19:42:28 +03:00
return & processor {
fromClientAPI : fromClientAPI ,
fromFederator : fromFederator ,
federator : federator ,
stop : make ( chan interface { } ) ,
log : log ,
config : config ,
tc : tc ,
oauthServer : oauthServer ,
mediaHandler : mediaHandler ,
storage : storage ,
timelineManager : timelineManager ,
db : db ,
2021-06-17 19:02:33 +03:00
filter : visibility . NewFilter ( db , log ) ,
2021-06-13 19:42:28 +03:00
2021-07-05 14:23:03 +03:00
accountProcessor : accountProcessor ,
adminProcessor : adminProcessor ,
2021-06-19 12:18:55 +03:00
statusProcessor : statusProcessor ,
streamingProcessor : streamingProcessor ,
2021-07-05 14:23:03 +03:00
mediaProcessor : mediaProcessor ,
2021-06-13 19:42:28 +03:00
}
2021-05-08 15:25:55 +03:00
}
// Start starts the Processor, reading from its channels and passing messages back and forth.
2021-08-25 16:34:33 +03:00
func ( p * processor ) Start ( ctx context . Context ) error {
2021-05-08 15:25:55 +03:00
go func ( ) {
DistLoop :
for {
select {
case clientMsg := <- p . fromClientAPI :
2021-07-11 17:22:21 +03:00
p . log . Tracef ( "received message FROM client API: %+v" , clientMsg )
2021-07-05 14:23:03 +03:00
go func ( ) {
2021-08-25 16:34:33 +03:00
if err := p . processFromClientAPI ( ctx , clientMsg ) ; err != nil {
2021-07-05 14:23:03 +03:00
p . log . Error ( err )
}
} ( )
2021-05-08 15:25:55 +03:00
case federatorMsg := <- p . fromFederator :
2021-07-11 17:22:21 +03:00
p . log . Tracef ( "received message FROM federator: %+v" , federatorMsg )
2021-07-05 14:23:03 +03:00
go func ( ) {
2021-08-25 16:34:33 +03:00
if err := p . processFromFederator ( ctx , federatorMsg ) ; err != nil {
2021-07-05 14:23:03 +03:00
p . log . Error ( err )
}
} ( )
2021-05-08 15:25:55 +03:00
case <- p . stop :
break DistLoop
}
}
} ( )
2021-08-15 19:43:08 +03:00
return nil
2021-05-08 15:25:55 +03:00
}
// Stop stops the processor cleanly, finishing handling any remaining messages before closing down.
// TODO: empty message buffer properly before stopping otherwise we'll lose federating messages.
func ( p * processor ) Stop ( ) error {
close ( p . stop )
return nil
}