Add websocket

This commit is contained in:
Lunny Xiao 2022-05-02 17:02:51 +08:00 committed by Jason Song
parent e397fcc259
commit 2babadbc94
5 changed files with 244 additions and 89 deletions

View file

@ -2,19 +2,20 @@ name: checks
on: [push] on: [push]
env: env:
ACT_OWNER: ${{ github.repository_owner }} GOPROXY: https://goproxy.io,direct
ACT_REPOSITORY: ${{ github.repository }}
GO_VERSION: 1.18
CGO_ENABLED: 0
jobs: jobs:
lint: lint:
name: lint name: lint
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- uses: actions/setup-go@v3
with:
go-version: 1.17
- uses: actions/checkout@v3 - uses: actions/checkout@v3
with: - uses: Jerome1337/golint-action@v1.0.2
fetch-depth: 0 #- name: golangci-lint
- uses: golangci/golangci-lint-action@v3.1.0 # uses: golangci/golangci-lint-action@v3
with: # with:
version: latest # Optional: version of golangci-lint to use in form of v1.2 or v1.2.3 or `latest` to use the latest version
# version: v1.29

140
cmd/damon.go Normal file
View file

@ -0,0 +1,140 @@
package cmd
import (
"context"
"encoding/json"
"errors"
"strings"
"time"
"github.com/gorilla/websocket"
"github.com/rs/zerolog/log"
"github.com/spf13/cobra"
)
type Message struct {
Version int //
Type int // message type, 1 register 2 error
RunnerUUID string // runner uuid
ErrCode int // error code
ErrContent string // errors message
EventName string
EventPayload string
}
func runDaemon(ctx context.Context, input *Input) func(cmd *cobra.Command, args []string) error {
log.Info().Msgf("Starting runner daemon")
return func(cmd *cobra.Command, args []string) error {
var conn *websocket.Conn
var err error
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
var failedCnt int
for {
select {
case <-ctx.Done():
if conn != nil {
err = conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
if err != nil {
log.Error().Msgf("write close: %v", err)
}
}
if errors.Is(ctx.Err(), context.Canceled) {
return nil
}
return ctx.Err()
case <-ticker.C:
if conn == nil {
log.Trace().Msgf("trying connect %v", "ws://localhost:3000/api/actions")
conn, _, err = websocket.DefaultDialer.DialContext(ctx, "ws://localhost:3000/api/actions", nil)
if err != nil {
log.Error().Msgf("dial: %v", err)
break
}
// register the client
msg := Message{
Version: 1,
Type: 1,
RunnerUUID: "111111",
}
bs, err := json.Marshal(&msg)
if err != nil {
log.Error().Msgf("Marshal: %v", err)
break
}
if err = conn.WriteMessage(websocket.TextMessage, bs); err != nil {
log.Error().Msgf("register failed: %v", err)
conn.Close()
conn = nil
break
}
}
const timeout = time.Second * 10
for {
conn.SetReadDeadline(time.Now().Add(timeout))
conn.SetPongHandler(func(string) error { conn.SetReadDeadline(time.Now().Add(timeout)); return nil })
_, message, err := conn.ReadMessage()
if err != nil {
if websocket.IsCloseError(err, websocket.CloseAbnormalClosure) ||
websocket.IsCloseError(err, websocket.CloseNormalClosure) {
log.Trace().Msgf("closed from remote")
conn.Close()
conn = nil
} else if !strings.Contains(err.Error(), "i/o timeout") {
log.Error().Msgf("read message failed: %#v", err)
}
failedCnt++
if failedCnt > 60 {
if conn != nil {
conn.Close()
conn = nil
}
failedCnt = 0
}
break
}
// TODO: handle the message
var msg Message
if err = json.Unmarshal(message, &msg); err != nil {
log.Error().Msgf("unmarshal received message faild: %v", err)
continue
}
switch msg.Version {
case 1:
switch msg.Type {
case 1:
log.Info().Msgf("received message: %s", message)
case 2:
case 4:
log.Info().Msgf("received no task")
case 3:
switch msg.EventName {
case "push":
input := Input{
forgeInstance: "github.com",
}
if err := runTask(context.Background(), &input, ""); err != nil {
log.Error().Msgf("run task failed: %v", err)
}
default:
log.Warn().Msgf("unknow event %s with payload %s", msg.EventName, msg.EventPayload)
}
default:
log.Error().Msgf("received a message with an unsupported type: %#v", msg)
}
default:
log.Error().Msgf("recevied a message with an unsupported version, consider upgrade your runner")
}
}
}
}
}
}

View file

@ -70,6 +70,13 @@ func Execute(ctx context.Context) {
Version: version, Version: version,
SilenceUsage: true, SilenceUsage: true,
} }
rootCmd.AddCommand(&cobra.Command{
Aliases: []string{"daemon"},
Use: "daemon [event name to run]\nIf no event name passed, will default to \"on: push\"",
Short: "Run GitHub actions locally by specifying the event name (e.g. `push`) or an action name directly.",
Args: cobra.MaximumNArgs(1),
RunE: runDaemon(ctx, &input),
})
rootCmd.Flags().BoolP("run", "r", false, "run workflows") rootCmd.Flags().BoolP("run", "r", false, "run workflows")
rootCmd.Flags().StringP("job", "j", "", "run job") rootCmd.Flags().StringP("job", "j", "", "run job")
rootCmd.PersistentFlags().StringVarP(&input.forgeInstance, "forge-instance", "", "github.com", "Forge instance to use.") rootCmd.PersistentFlags().StringVarP(&input.forgeInstance, "forge-instance", "", "github.com", "Forge instance to use.")
@ -96,91 +103,96 @@ func getWorkflowsPath() (string, error) {
return p, nil return p, nil
} }
func runTask(ctx context.Context, input *Input, jobID string) error {
workflowsPath, err := getWorkflowsPath()
if err != nil {
return err
}
planner, err := model.NewWorkflowPlanner(workflowsPath, false)
if err != nil {
return err
}
var eventName string
events := planner.GetEvents()
if len(events) > 0 {
// set default event type to first event
// this way user dont have to specify the event.
log.Debug().Msgf("Using detected workflow event: %s", events[0])
eventName = events[0]
} else {
if plan := planner.PlanEvent("push"); plan != nil {
eventName = "push"
}
}
// build the plan for this run
var plan *model.Plan
if jobID != "" {
log.Debug().Msgf("Planning job: %s", jobID)
plan = planner.PlanJob(jobID)
} else {
log.Debug().Msgf("Planning event: %s", eventName)
plan = planner.PlanEvent(eventName)
}
curDir, err := os.Getwd()
if err != nil {
return err
}
// run the plan
config := &runner.Config{
Actor: input.actor,
EventName: eventName,
EventPath: "",
DefaultBranch: "",
ForcePull: input.forcePull,
ForceRebuild: input.forceRebuild,
ReuseContainers: input.reuseContainers,
Workdir: curDir,
BindWorkdir: input.bindWorkdir,
LogOutput: true,
JSONLogger: input.jsonLogger,
// Env: envs,
// Secrets: secrets,
InsecureSecrets: input.insecureSecrets,
Platforms: input.newPlatforms(),
Privileged: input.privileged,
UsernsMode: input.usernsMode,
ContainerArchitecture: input.containerArchitecture,
ContainerDaemonSocket: input.containerDaemonSocket,
UseGitIgnore: input.useGitIgnore,
GitHubInstance: input.forgeInstance,
ContainerCapAdd: input.containerCapAdd,
ContainerCapDrop: input.containerCapDrop,
AutoRemove: input.autoRemove,
ArtifactServerPath: input.artifactServerPath,
ArtifactServerPort: input.artifactServerPort,
NoSkipCheckout: input.noSkipCheckout,
// RemoteName: input.remoteName,
}
r, err := runner.New(config)
if err != nil {
return fmt.Errorf("New config failed: %v", err)
}
cancel := artifacts.Serve(ctx, input.artifactServerPath, input.artifactServerPort)
executor := r.NewPlanExecutor(plan).Finally(func(ctx context.Context) error {
cancel()
return nil
})
return executor(ctx)
}
func runCommand(ctx context.Context, input *Input) func(cmd *cobra.Command, args []string) error { func runCommand(ctx context.Context, input *Input) func(cmd *cobra.Command, args []string) error {
return func(cmd *cobra.Command, args []string) error { return func(cmd *cobra.Command, args []string) error {
workflowsPath, err := getWorkflowsPath() jobID, err := cmd.Flags().GetString("job")
if err != nil {
return err
}
planner, err := model.NewWorkflowPlanner(workflowsPath, false)
if err != nil { if err != nil {
return err return err
} }
var eventName string return runTask(ctx, input, jobID)
events := planner.GetEvents()
if len(events) > 0 {
// set default event type to first event
// this way user dont have to specify the event.
log.Debug().Msgf("Using detected workflow event: %s", events[0])
eventName = events[0]
} else {
if len(args) > 0 {
eventName = args[0]
} else if plan := planner.PlanEvent("push"); plan != nil {
eventName = "push"
}
}
// build the plan for this run
var plan *model.Plan
if jobID, err := cmd.Flags().GetString("job"); err != nil {
return err
} else if jobID != "" {
log.Debug().Msgf("Planning job: %s", jobID)
plan = planner.PlanJob(jobID)
} else {
log.Debug().Msgf("Planning event: %s", eventName)
plan = planner.PlanEvent(eventName)
}
curDir, err := os.Getwd()
if err != nil {
return err
}
// run the plan
config := &runner.Config{
Actor: input.actor,
EventName: eventName,
EventPath: "",
DefaultBranch: "",
ForcePull: input.forcePull,
ForceRebuild: input.forceRebuild,
ReuseContainers: input.reuseContainers,
Workdir: curDir,
BindWorkdir: input.bindWorkdir,
LogOutput: !input.noOutput,
JSONLogger: input.jsonLogger,
// Env: envs,
// Secrets: secrets,
InsecureSecrets: input.insecureSecrets,
Platforms: input.newPlatforms(),
Privileged: input.privileged,
UsernsMode: input.usernsMode,
ContainerArchitecture: input.containerArchitecture,
ContainerDaemonSocket: input.containerDaemonSocket,
UseGitIgnore: input.useGitIgnore,
GitHubInstance: input.forgeInstance,
ContainerCapAdd: input.containerCapAdd,
ContainerCapDrop: input.containerCapDrop,
AutoRemove: input.autoRemove,
ArtifactServerPath: input.artifactServerPath,
ArtifactServerPort: input.artifactServerPort,
NoSkipCheckout: input.noSkipCheckout,
// RemoteName: input.remoteName,
}
r, err := runner.New(config)
if err != nil {
return fmt.Errorf("New config failed: %v", err)
}
cancel := artifacts.Serve(ctx, input.artifactServerPath, input.artifactServerPort)
executor := r.NewPlanExecutor(plan).Finally(func(ctx context.Context) error {
cancel()
return nil
})
return executor(ctx)
} }
} }

1
go.mod
View file

@ -3,6 +3,7 @@ module gitea.com/gitea/act_runner
go 1.18 go 1.18
require ( require (
github.com/gorilla/websocket v1.4.2
github.com/nektos/act v0.2.26 github.com/nektos/act v0.2.26
github.com/rs/zerolog v1.26.1 github.com/rs/zerolog v1.26.1
github.com/spf13/cobra v1.4.0 github.com/spf13/cobra v1.4.0

1
go.sum
View file

@ -736,6 +736,7 @@ github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gostaticanalysis/analysisutil v0.0.0-20190318220348-4088753ea4d3/go.mod h1:eEOZF4jCKGi+aprrirO9e7WKB3beBRtWgqGunKl6pKE= github.com/gostaticanalysis/analysisutil v0.0.0-20190318220348-4088753ea4d3/go.mod h1:eEOZF4jCKGi+aprrirO9e7WKB3beBRtWgqGunKl6pKE=
github.com/gostaticanalysis/analysisutil v0.0.3/go.mod h1:eEOZF4jCKGi+aprrirO9e7WKB3beBRtWgqGunKl6pKE= github.com/gostaticanalysis/analysisutil v0.0.3/go.mod h1:eEOZF4jCKGi+aprrirO9e7WKB3beBRtWgqGunKl6pKE=