Merge pull request 'cherry-pick commits from act_runner' (#23) from earl-warren/runner:wip-sync into main

Reviewed-on: https://code.forgejo.org/forgejo/runner/pulls/23
Reviewed-by: dachary <dachary@noreply.code.forgejo.org>
This commit is contained in:
earl-warren 2023-04-04 23:07:29 +00:00
commit cd20007e4d
25 changed files with 599 additions and 358 deletions

1
.gitignore vendored
View file

@ -4,6 +4,7 @@ forgejo-runner
.runner
coverage.txt
/gitea-vet
/config.yaml
# MS VSCode
.vscode

View file

@ -18,7 +18,6 @@ import (
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
"github.com/go-chi/render"
"github.com/nektos/act/pkg/common"
log "github.com/sirupsen/logrus"
_ "modernc.org/sqlite"
"xorm.io/builder"
@ -39,16 +38,19 @@ type Handler struct {
gc atomic.Bool
gcAt time.Time
outboundIP string
}
func NewHandler() (*Handler, error) {
func NewHandler(dir, outboundIP string, port uint16) (*Handler, error) {
h := &Handler{}
dir := "" // TODO: make the dir configurable if necessary
if home, err := os.UserHomeDir(); err != nil {
return nil, err
} else {
dir = filepath.Join(home, ".cache/actcache")
if dir == "" {
if home, err := os.UserHomeDir(); err != nil {
return nil, err
} else {
dir = filepath.Join(home, ".cache", "actcache")
}
}
if err := os.MkdirAll(dir, 0o755); err != nil {
return nil, err
@ -69,6 +71,14 @@ func NewHandler() (*Handler, error) {
}
h.storage = storage
if outboundIP != "" {
h.outboundIP = outboundIP
} else if ip, err := getOutboundIP(); err != nil {
return nil, err
} else {
h.outboundIP = ip.String()
}
router := chi.NewRouter()
router.Use(middleware.RequestLogger(&middleware.DefaultLogFormatter{Logger: logger}))
router.Use(func(handler http.Handler) http.Handler {
@ -95,8 +105,7 @@ func NewHandler() (*Handler, error) {
h.gcCache()
// TODO: make the port configurable if necessary
listener, err := net.Listen("tcp", ":0") // random available port
listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) // listen on all interfaces
if err != nil {
return nil, err
}
@ -113,7 +122,7 @@ func NewHandler() (*Handler, error) {
func (h *Handler) ExternalURL() string {
// TODO: make the external url configurable if necessary
return fmt.Sprintf("http://%s:%d",
common.GetOutboundIP().String(),
h.outboundIP,
h.listener.Addr().(*net.TCPAddr).Port)
}

View file

@ -5,6 +5,7 @@ package artifactcache
import (
"fmt"
"net"
"net/http"
"strconv"
"strings"
@ -44,8 +45,35 @@ func parseContentRange(s string) (int64, int64, error) {
return start, stop, nil
}
func getOutboundIP() (net.IP, error) {
// FIXME: It makes more sense to use the gateway IP address of container network
if conn, err := net.Dial("udp", "8.8.8.8:80"); err == nil {
defer conn.Close()
return conn.LocalAddr().(*net.UDPAddr).IP, nil
}
if ifaces, err := net.Interfaces(); err == nil {
for _, i := range ifaces {
if addrs, err := i.Addrs(); err == nil {
for _, addr := range addrs {
var ip net.IP
switch v := addr.(type) {
case *net.IPNet:
ip = v.IP
case *net.IPAddr:
ip = v.IP
}
if ip.IsGlobalUnicast() {
return ip, nil
}
}
}
}
}
return nil, fmt.Errorf("no outbound IP address found")
}
// engine is a wrapper of *xorm.Engine, with a lock.
// To avoid racing of sqlite, we don't careperformance here.
// To avoid racing of sqlite, we don't care performance here.
type engine struct {
e *xorm.Engine
m sync.Mutex

10
client/header.go Normal file
View file

@ -0,0 +1,10 @@
// Copyright 2023 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package client
const (
UUIDHeader = "x-runner-uuid"
TokenHeader = "x-runner-token"
VersionHeader = "x-runner-version"
)

View file

@ -8,7 +8,6 @@ import (
"code.gitea.io/actions-proto-go/ping/v1/pingv1connect"
"code.gitea.io/actions-proto-go/runner/v1/runnerv1connect"
"codeberg.org/forgejo/runner/core"
"github.com/bufbuild/connect-go"
)
@ -32,13 +31,13 @@ func New(endpoint string, insecure bool, uuid, token, runnerVersion string, opts
opts = append(opts, connect.WithInterceptors(connect.UnaryInterceptorFunc(func(next connect.UnaryFunc) connect.UnaryFunc {
return func(ctx context.Context, req connect.AnyRequest) (connect.AnyResponse, error) {
if uuid != "" {
req.Header().Set(core.UUIDHeader, uuid)
req.Header().Set(UUIDHeader, uuid)
}
if token != "" {
req.Header().Set(core.TokenHeader, token)
req.Header().Set(TokenHeader, token)
}
if runnerVersion != "" {
req.Header().Set(core.VersionHeader, runnerVersion)
req.Header().Set(VersionHeader, runnerVersion)
}
return next(ctx, req)
}

View file

@ -1,24 +1,20 @@
// SPDX-License-Identifier: MIT
package cmd
import (
"context"
"fmt"
"os"
"github.com/spf13/cobra"
"codeberg.org/forgejo/runner/config"
)
// the version of act_runner
var version = "develop"
type globalArgs struct {
EnvFile string
}
func Execute(ctx context.Context) {
// task := runtime.NewTask("gitea", 0, nil, nil)
var gArgs globalArgs
// ./act_runner
rootCmd := &cobra.Command{
Use: "act_runner [event name to run]\nIf no event name passed, will default to \"on: push\"",
@ -26,9 +22,9 @@ func Execute(ctx context.Context) {
Args: cobra.MaximumNArgs(1),
Version: version,
SilenceUsage: true,
RunE: runDaemon(ctx, gArgs.EnvFile),
}
rootCmd.PersistentFlags().StringVarP(&gArgs.EnvFile, "env-file", "", ".env", "Read in a file of environment variables.")
configFile := ""
rootCmd.PersistentFlags().StringVarP(&configFile, "config", "c", "", "Config file path")
// ./act_runner register
var regArgs registerArgs
@ -36,11 +32,10 @@ func Execute(ctx context.Context) {
Use: "register",
Short: "Register a runner to the server",
Args: cobra.MaximumNArgs(0),
RunE: runRegister(ctx, &regArgs, gArgs.EnvFile), // must use a pointer to regArgs
RunE: runRegister(ctx, &regArgs, &configFile), // must use a pointer to regArgs
}
registerCmd.Flags().BoolVar(&regArgs.NoInteractive, "no-interactive", false, "Disable interactive mode")
registerCmd.Flags().StringVar(&regArgs.InstanceAddr, "instance", "", "Forgejo instance address")
registerCmd.Flags().BoolVar(&regArgs.Insecure, "insecure", false, "If check server's certificate if it's https protocol")
registerCmd.Flags().StringVar(&regArgs.Token, "token", "", "Runner token")
registerCmd.Flags().StringVar(&regArgs.RunnerName, "name", "", "Runner name")
registerCmd.Flags().StringVar(&regArgs.Labels, "labels", "", "Runner tags, comma separated")
@ -51,13 +46,23 @@ func Execute(ctx context.Context) {
Use: "daemon",
Short: "Run as a runner daemon",
Args: cobra.MaximumNArgs(1),
RunE: runDaemon(ctx, gArgs.EnvFile),
RunE: runDaemon(ctx, &configFile),
}
rootCmd.AddCommand(daemonCmd)
// ./act_runner exec
rootCmd.AddCommand(loadExecCmd(ctx))
// ./act_runner config
rootCmd.AddCommand(&cobra.Command{
Use: "generate-config",
Short: "Generate an example config file",
Args: cobra.MaximumNArgs(0),
Run: func(_ *cobra.Command, _ []string) {
fmt.Printf("%s", config.Example)
},
})
// hide completion command
rootCmd.CompletionOptions.HiddenDefaultCmd = true

View file

@ -2,8 +2,13 @@ package cmd
import (
"context"
"fmt"
"os"
"strings"
"github.com/mattn/go-isatty"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"golang.org/x/sync/errgroup"
"codeberg.org/forgejo/runner/artifactcache"
"codeberg.org/forgejo/runner/client"
@ -11,32 +16,32 @@ import (
"codeberg.org/forgejo/runner/engine"
"codeberg.org/forgejo/runner/poller"
"codeberg.org/forgejo/runner/runtime"
"github.com/joho/godotenv"
"github.com/mattn/go-isatty"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"golang.org/x/sync/errgroup"
)
func runDaemon(ctx context.Context, envFile string) func(cmd *cobra.Command, args []string) error {
func runDaemon(ctx context.Context, configFile *string) func(cmd *cobra.Command, args []string) error {
return func(cmd *cobra.Command, args []string) error {
log.Infoln("Starting runner daemon")
_ = godotenv.Load(envFile)
cfg, err := config.FromEnviron()
cfg, err := config.LoadDefault(*configFile)
if err != nil {
log.WithError(err).
Fatalln("invalid configuration")
return fmt.Errorf("invalid configuration: %w", err)
}
initLogging(cfg)
reg, err := config.LoadRegistration(cfg.Runner.File)
if os.IsNotExist(err) {
log.Error("registration file not found, please register the runner first")
return err
} else if err != nil {
return fmt.Errorf("failed to load registration file: %w", err)
}
// require docker if a runner label uses a docker backend
needsDocker := false
for _, l := range cfg.Runner.Labels {
splits := strings.SplitN(l, ":", 2)
if len(splits) == 2 && strings.HasPrefix(splits[1], "docker://") {
for _, l := range reg.Labels {
_, schema, _, _ := runtime.ParseLabel(l)
if schema == "docker" {
needsDocker = true
break
}
@ -50,43 +55,44 @@ func runDaemon(ctx context.Context, envFile string) func(cmd *cobra.Command, arg
}
}
handler, err := artifactcache.NewHandler()
if err != nil {
return err
}
log.Infof("cache handler listens on: %v", handler.ExternalURL())
var g errgroup.Group
cli := client.New(
cfg.Client.Address,
cfg.Client.Insecure,
cfg.Runner.UUID,
cfg.Runner.Token,
reg.Address,
cfg.Runner.Insecure,
reg.UUID,
reg.Token,
version,
)
runner := &runtime.Runner{
Client: cli,
Machine: cfg.Runner.Name,
ForgeInstance: cfg.Client.Address,
Environ: cfg.Runner.Environ,
Labels: cfg.Runner.Labels,
Machine: reg.Name,
ForgeInstance: reg.Address,
Environ: cfg.Runner.Envs,
Labels: reg.Labels,
Network: cfg.Container.Network,
Version: version,
CacheHandler: handler,
}
if *cfg.Cache.Enabled {
if handler, err := artifactcache.NewHandler(cfg.Cache.Dir, cfg.Cache.Host, cfg.Cache.Port); err != nil {
log.Errorf("cannot init cache server, it will be disabled: %v", err)
} else {
log.Infof("cache handler listens on: %v", handler.ExternalURL())
runner.CacheHandler = handler
}
}
poller := poller.New(
cli,
runner.Run,
cfg.Runner.Capacity,
cfg,
)
g.Go(func() error {
l := log.WithField("capacity", cfg.Runner.Capacity).
WithField("endpoint", cfg.Client.Address).
WithField("os", cfg.Platform.OS).
WithField("arch", cfg.Platform.Arch)
WithField("endpoint", reg.Address)
l.Infoln("polling the remote server")
if err := poller.Poll(ctx); err != nil {
@ -106,17 +112,22 @@ func runDaemon(ctx context.Context, envFile string) func(cmd *cobra.Command, arg
}
// initLogging setup the global logrus logger.
func initLogging(cfg config.Config) {
func initLogging(cfg *config.Config) {
isTerm := isatty.IsTerminal(os.Stdout.Fd())
log.SetFormatter(&log.TextFormatter{
DisableColors: !isTerm,
FullTimestamp: true,
})
if cfg.Debug {
log.SetLevel(log.DebugLevel)
}
if cfg.Trace {
log.SetLevel(log.TraceLevel)
if l := cfg.Log.Level; l != "" {
level, err := log.ParseLevel(l)
if err != nil {
log.WithError(err).
Errorf("invalid log level: %q", l)
}
if log.GetLevel() != level {
log.Infof("log level changed to %v", level)
log.SetLevel(level)
}
}
}

View file

@ -54,6 +54,7 @@ type executeArgs struct {
noSkipCheckout bool
debug bool
dryrun bool
image string
cacheHandler *artifactcache.Handler
}
@ -347,7 +348,7 @@ func runExec(ctx context.Context, execArgs *executeArgs) func(cmd *cobra.Command
}
// init a cache server
handler, err := artifactcache.NewHandler()
handler, err := artifactcache.NewHandler("", "", 0)
if err != nil {
return err
}
@ -385,7 +386,7 @@ func runExec(ctx context.Context, execArgs *executeArgs) func(cmd *cobra.Command
ContainerNetworkMode: "bridge",
DefaultActionInstance: execArgs.defaultActionsUrl,
PlatformPicker: func(_ []string) string {
return "node:16-bullseye"
return execArgs.image
},
}
@ -461,6 +462,7 @@ func loadExecCmd(ctx context.Context) *cobra.Command {
execCmd.PersistentFlags().BoolVarP(&execArg.noSkipCheckout, "no-skip-checkout", "", false, "Do not skip actions/checkout")
execCmd.PersistentFlags().BoolVarP(&execArg.debug, "debug", "d", false, "enable debug log")
execCmd.PersistentFlags().BoolVarP(&execArg.dryrun, "dryrun", "n", false, "dryrun mode")
execCmd.PersistentFlags().StringVarP(&execArg.image, "image", "i", "node:16-bullseye", "docker image to use")
return execCmd
}

View file

@ -1,3 +1,6 @@
// Copyright 2022 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package cmd
import (
@ -6,24 +9,25 @@ import (
"fmt"
"os"
"os/signal"
"runtime"
goruntime "runtime"
"strings"
"time"
pingv1 "code.gitea.io/actions-proto-go/ping/v1"
runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
"codeberg.org/forgejo/runner/client"
"codeberg.org/forgejo/runner/config"
"codeberg.org/forgejo/runner/register"
"codeberg.org/forgejo/runner/runtime"
"github.com/bufbuild/connect-go"
"github.com/joho/godotenv"
"github.com/mattn/go-isatty"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)
// runRegister registers a runner to the server
func runRegister(ctx context.Context, regArgs *registerArgs, envFile string) func(*cobra.Command, []string) error {
func runRegister(ctx context.Context, regArgs *registerArgs, configFile *string) func(*cobra.Command, []string) error {
return func(cmd *cobra.Command, args []string) error {
log.SetReportCaller(false)
isTerm := isatty.IsTerminal(os.Stdout.Fd())
@ -34,7 +38,7 @@ func runRegister(ctx context.Context, regArgs *registerArgs, envFile string) fun
log.SetLevel(log.DebugLevel)
log.Infof("Registering runner, arch=%s, os=%s, version=%s.",
runtime.GOARCH, runtime.GOOS, version)
goruntime.GOARCH, goruntime.GOOS, version)
// runner always needs root permission
if os.Getuid() != 0 {
@ -43,14 +47,13 @@ func runRegister(ctx context.Context, regArgs *registerArgs, envFile string) fun
}
if regArgs.NoInteractive {
if err := registerNoInteractive(envFile, regArgs); err != nil {
if err := registerNoInteractive(*configFile, regArgs); err != nil {
return err
}
} else {
go func() {
if err := registerInteractive(envFile); err != nil {
// log.Errorln(err)
os.Exit(2)
if err := registerInteractive(*configFile); err != nil {
log.Fatal(err)
return
}
os.Exit(0)
@ -69,7 +72,6 @@ func runRegister(ctx context.Context, regArgs *registerArgs, envFile string) fun
type registerArgs struct {
NoInteractive bool
InstanceAddr string
Insecure bool
Token string
RunnerName string
Labels string
@ -97,7 +99,6 @@ var defaultLabels = []string{
type registerInputs struct {
InstanceAddr string
Insecure bool
Token string
RunnerName string
CustomLabels []string
@ -118,12 +119,9 @@ func (r *registerInputs) validate() error {
func validateLabels(labels []string) error {
for _, label := range labels {
values := strings.SplitN(label, ":", 2)
if len(values) > 2 {
return fmt.Errorf("Invalid label: %s", label)
if _, _, _, err := runtime.ParseLabel(label); err != nil {
return err
}
// len(values) == 1, label for non docker execution environment
// TODO: validate value format, like docker://node:16-buster
}
return nil
}
@ -164,7 +162,7 @@ func (r *registerInputs) assignToNext(stage registerStage, value string) registe
}
if validateLabels(r.CustomLabels) != nil {
log.Infoln("Invalid labels, please input again, leave blank to use the default labels (for example, ubuntu-20.04:docker://node:16-bullseye,ubuntu-18.04:docker://node:16-buster)")
log.Infoln("Invalid labels, please input again, leave blank to use the default labels (for example, ubuntu-20.04:docker://node:16-bullseye,ubuntu-18.04:docker://node:16-buster,linux_arm:host)")
return StageInputCustomLabels
}
return StageWaitingForRegistration
@ -172,16 +170,17 @@ func (r *registerInputs) assignToNext(stage registerStage, value string) registe
return StageUnknown
}
func registerInteractive(envFile string) error {
func registerInteractive(configFile string) error {
var (
reader = bufio.NewReader(os.Stdin)
stage = StageInputInstance
inputs = new(registerInputs)
)
// check if overwrite local config
_ = godotenv.Load(envFile)
cfg, _ := config.FromEnviron()
cfg, err := config.LoadDefault(configFile)
if err != nil {
return fmt.Errorf("failed to load config: %v", err)
}
if f, err := os.Stat(cfg.Runner.File); err == nil && !f.IsDir() {
stage = StageOverwriteLocalConfig
}
@ -197,7 +196,7 @@ func registerInteractive(envFile string) error {
if stage == StageWaitingForRegistration {
log.Infof("Registering runner, name=%s, instance=%s, labels=%v.", inputs.RunnerName, inputs.InstanceAddr, inputs.CustomLabels)
if err := doRegister(&cfg, inputs); err != nil {
if err := doRegister(cfg, inputs); err != nil {
log.Errorf("Failed to register runner: %v", err)
} else {
log.Infof("Runner registered successfully.")
@ -221,25 +220,26 @@ func printStageHelp(stage registerStage) {
case StageOverwriteLocalConfig:
log.Infoln("Runner is already registered, overwrite local config? [y/N]")
case StageInputInstance:
log.Infoln("Enter the Forgejo instance URL (for example, https://codeberg.org/):")
log.Infoln("Enter the Gitea instance URL (for example, https://gitea.com/):")
case StageInputToken:
log.Infoln("Enter the runner token:")
case StageInputRunnerName:
hostname, _ := os.Hostname()
log.Infof("Enter the runner name (if set empty, use hostname:%s ):\n", hostname)
log.Infof("Enter the runner name (if set empty, use hostname: %s):\n", hostname)
case StageInputCustomLabels:
log.Infoln("Enter the runner labels, leave blank to use the default labels (comma-separated, for example, self-hosted,ubuntu-20.04:docker://node:16-bullseye,ubuntu-18.04:docker://node:16-buster):")
log.Infoln("Enter the runner labels, leave blank to use the default labels (comma-separated, for example, ubuntu-20.04:docker://node:16-bullseye,ubuntu-18.04:docker://node:16-buster,linux_arm:host):")
case StageWaitingForRegistration:
log.Infoln("Waiting for registration...")
}
}
func registerNoInteractive(envFile string, regArgs *registerArgs) error {
_ = godotenv.Load(envFile)
cfg, _ := config.FromEnviron()
func registerNoInteractive(configFile string, regArgs *registerArgs) error {
cfg, err := config.LoadDefault(configFile)
if err != nil {
return err
}
inputs := &registerInputs{
InstanceAddr: regArgs.InstanceAddr,
Insecure: regArgs.Insecure,
Token: regArgs.Token,
RunnerName: regArgs.RunnerName,
CustomLabels: defaultLabels,
@ -256,7 +256,7 @@ func registerNoInteractive(envFile string, regArgs *registerArgs) error {
log.WithError(err).Errorf("Invalid input, please re-run act command.")
return nil
}
if err := doRegister(&cfg, inputs); err != nil {
if err := doRegister(cfg, inputs); err != nil {
log.Errorf("Failed to register runner: %v", err)
return nil
}
@ -270,7 +270,7 @@ func doRegister(cfg *config.Config, inputs *registerInputs) error {
// initial http client
cli := client.New(
inputs.InstanceAddr,
inputs.Insecure,
cfg.Runner.Insecure,
"",
"",
version,
@ -290,18 +290,45 @@ func doRegister(cfg *config.Config, inputs *registerInputs) error {
}
if err != nil {
log.WithError(err).
Errorln("Cannot ping the Forgejo instance server")
Errorln("Cannot ping the Gitea instance server")
// TODO: if ping failed, retry or exit
time.Sleep(time.Second)
} else {
log.Debugln("Successfully pinged the Forgejo instance server")
log.Debugln("Successfully pinged the Gitea instance server")
break
}
}
cfg.Runner.Name = inputs.RunnerName
cfg.Runner.Token = inputs.Token
cfg.Runner.Labels = inputs.CustomLabels
_, err := register.New(cli).Register(ctx, cfg.Runner)
return err
reg := &config.Registration{
Name: inputs.RunnerName,
Token: inputs.Token,
Address: inputs.InstanceAddr,
Labels: inputs.CustomLabels,
}
labels := make([]string, len(reg.Labels))
for i, v := range reg.Labels {
l, _, _, _ := runtime.ParseLabel(v)
labels[i] = l
}
// register new runner.
resp, err := cli.Register(ctx, connect.NewRequest(&runnerv1.RegisterRequest{
Name: reg.Name,
Token: reg.Token,
AgentLabels: labels,
}))
if err != nil {
log.WithError(err).Error("poller: cannot register new runner")
return err
}
reg.ID = resp.Msg.Runner.Id
reg.UUID = resp.Msg.Runner.Uuid
reg.Name = resp.Msg.Runner.Name
reg.Token = resp.Msg.Runner.Token
if err := config.SaveRegistration(cfg.Runner.File, reg); err != nil {
return fmt.Errorf("failed to save runner config: %w", err)
}
return nil
}

View file

@ -1,10 +0,0 @@
package cmd
import "testing"
func TestValidateLabels(t *testing.T) {
labels := []string{"ubuntu-latest:docker://node:16-buster", "self-hosted"}
if err := validateLabels(labels); err != nil {
t.Errorf("validateLabels() error = %v", err)
}
}

View file

@ -0,0 +1,42 @@
# Example configuration file, it's safe to copy this as the default config file without any modification.
log:
# The level of logging, can be trace, debug, info, warn, error, fatal
level: info
runner:
# Where to store the registration result.
file: .runner
# Execute how many tasks concurrently at the same time.
capacity: 1
# Extra environment variables to run jobs.
envs:
A_TEST_ENV_NAME_1: a_test_env_value_1
A_TEST_ENV_NAME_2: a_test_env_value_2
# Extra environment variables to run jobs from a file.
# It will be ignored if it's empty or the file doesn't exist.
env_file: .env
# The timeout for a job to be finished.
# Please note that the Gitea instance also has a timeout (3h by default) for the job.
# So the job could be stopped by the Gitea instance if it's timeout is shorter than this.
timeout: 3h
# Whether skip verifying the TLS certificate of the Gitea instance.
insecure: false
cache:
# Enable cache server to use actions/cache.
enabled: true
# The directory to store the cache data.
# If it's empty, the cache data will be stored in $HOME/.cache/actcache.
dir: ""
# The host of the cache server.
# It's not for the address to listen, but the address to connect from job containers.
# So 0.0.0.0 is a bad choice, leave it empty to detect automatically.
host: ""
# The port of the cache server.
# 0 means to use a random available port.
port: 0
container:
# Which network to use for the job containers.
network: bridge

View file

@ -1,117 +1,92 @@
package config
import (
"encoding/json"
"io"
"fmt"
"os"
"runtime"
"strconv"
"codeberg.org/forgejo/runner/core"
"path/filepath"
"time"
"github.com/joho/godotenv"
"github.com/kelseyhightower/envconfig"
"gopkg.in/yaml.v3"
)
type (
// Config provides the system configuration.
Config struct {
Debug bool `envconfig:"GITEA_DEBUG"`
Trace bool `envconfig:"GITEA_TRACE"`
Client Client
Runner Runner
Platform Platform
}
Client struct {
Address string `ignored:"true"`
Insecure bool
}
type Config struct {
Log struct {
Level string `yaml:"level"`
} `yaml:"log"`
Runner struct {
UUID string `ignored:"true"`
Name string `envconfig:"GITEA_RUNNER_NAME"`
Token string `ignored:"true"`
Capacity int `envconfig:"GITEA_RUNNER_CAPACITY" default:"1"`
File string `envconfig:"FORGEJO_RUNNER_FILE" default:".runner"`
Environ map[string]string `envconfig:"GITEA_RUNNER_ENVIRON"`
EnvFile string `envconfig:"GITEA_RUNNER_ENV_FILE"`
Labels []string `envconfig:"GITEA_RUNNER_LABELS"`
File string `yaml:"file"`
Capacity int `yaml:"capacity"`
Envs map[string]string `yaml:"envs"`
EnvFile string `yaml:"env_file"`
Timeout time.Duration `yaml:"timeout"`
Insecure bool `yaml:"insecure"`
} `yaml:"runner"`
Cache struct {
Enabled *bool `yaml:"enabled"` // pointer to distinguish between false and not set, and it will be true if not set
Dir string `yaml:"dir"`
Host string `yaml:"host"`
Port uint16 `yaml:"port"`
} `yaml:"cache"`
Container struct {
Network string `yaml:"network"`
}
}
Platform struct {
OS string `envconfig:"GITEA_PLATFORM_OS"`
Arch string `envconfig:"GITEA_PLATFORM_ARCH"`
}
)
// FromEnviron returns the settings from the environment.
func FromEnviron() (Config, error) {
cfg := Config{}
if err := envconfig.Process("", &cfg); err != nil {
return cfg, err
}
// check runner config exist
f, err := os.Stat(cfg.Runner.File)
if err == nil && !f.IsDir() {
jsonFile, _ := os.Open(cfg.Runner.File)
defer jsonFile.Close()
byteValue, _ := io.ReadAll(jsonFile)
var runner core.Runner
if err := json.Unmarshal(byteValue, &runner); err != nil {
return cfg, err
}
if runner.UUID != "" {
cfg.Runner.UUID = runner.UUID
}
if runner.Name != "" {
cfg.Runner.Name = runner.Name
}
if runner.Token != "" {
cfg.Runner.Token = runner.Token
}
if len(runner.Labels) != 0 {
cfg.Runner.Labels = runner.Labels
}
if runner.Address != "" {
cfg.Client.Address = runner.Address
}
if runner.Insecure != "" {
cfg.Client.Insecure, _ = strconv.ParseBool(runner.Insecure)
}
} else if err != nil {
return cfg, err
}
// runner config
if cfg.Runner.Environ == nil {
cfg.Runner.Environ = map[string]string{
"GITHUB_API_URL": cfg.Client.Address + "/api/v1",
"GITHUB_SERVER_URL": cfg.Client.Address,
}
}
if cfg.Runner.Name == "" {
cfg.Runner.Name, _ = os.Hostname()
}
// platform config
if cfg.Platform.OS == "" {
cfg.Platform.OS = runtime.GOOS
}
if cfg.Platform.Arch == "" {
cfg.Platform.Arch = runtime.GOARCH
}
if file := cfg.Runner.EnvFile; file != "" {
envs, err := godotenv.Read(file)
// LoadDefault returns the default configuration.
// If file is not empty, it will be used to load the configuration.
func LoadDefault(file string) (*Config, error) {
cfg := &Config{}
if file != "" {
f, err := os.Open(file)
if err != nil {
return cfg, err
return nil, err
}
for k, v := range envs {
cfg.Runner.Environ[k] = v
defer f.Close()
decoder := yaml.NewDecoder(f)
if err := decoder.Decode(&cfg); err != nil {
return nil, err
}
}
compatibleWithOldEnvs(file != "", cfg)
if cfg.Runner.EnvFile != "" {
if stat, err := os.Stat(cfg.Runner.EnvFile); err == nil && !stat.IsDir() {
envs, err := godotenv.Read(cfg.Runner.EnvFile)
if err != nil {
return nil, fmt.Errorf("read env file %q: %w", cfg.Runner.EnvFile, err)
}
for k, v := range envs {
cfg.Runner.Envs[k] = v
}
}
}
if cfg.Log.Level == "" {
cfg.Log.Level = "info"
}
if cfg.Runner.File == "" {
cfg.Runner.File = ".runner"
}
if cfg.Runner.Capacity <= 0 {
cfg.Runner.Capacity = 1
}
if cfg.Runner.Timeout <= 0 {
cfg.Runner.Timeout = 3 * time.Hour
}
if cfg.Cache.Enabled == nil {
b := true
cfg.Cache.Enabled = &b
}
if *cfg.Cache.Enabled {
if cfg.Cache.Dir == "" {
home, _ := os.UserHomeDir()
cfg.Cache.Dir = filepath.Join(home, ".cache", "actcache")
}
}
if cfg.Container.Network == "" {
cfg.Container.Network = "bridge"
}
return cfg, nil
}

62
config/deprecated.go Normal file
View file

@ -0,0 +1,62 @@
// Copyright 2023 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package config
import (
"os"
"strconv"
"strings"
log "github.com/sirupsen/logrus"
)
// Deprecated: could be removed in the future. TODO: remove it when Gitea 1.20.0 is released.
// Be compatible with old envs.
func compatibleWithOldEnvs(fileUsed bool, cfg *Config) {
handleEnv := func(key string) (string, bool) {
if v, ok := os.LookupEnv(key); ok {
if fileUsed {
log.Warnf("env %s has been ignored because config file is used", key)
return "", false
}
log.Warnf("env %s will be deprecated, please use config file instead", key)
return v, true
}
return "", false
}
if v, ok := handleEnv("GITEA_DEBUG"); ok {
if b, _ := strconv.ParseBool(v); b {
cfg.Log.Level = "debug"
}
}
if v, ok := handleEnv("GITEA_TRACE"); ok {
if b, _ := strconv.ParseBool(v); b {
cfg.Log.Level = "trace"
}
}
if v, ok := handleEnv("GITEA_RUNNER_CAPACITY"); ok {
if i, _ := strconv.Atoi(v); i > 0 {
cfg.Runner.Capacity = i
}
}
if v, ok := handleEnv("GITEA_RUNNER_FILE"); ok {
cfg.Runner.File = v
}
if v, ok := handleEnv("GITEA_RUNNER_ENVIRON"); ok {
splits := strings.Split(v, ",")
if cfg.Runner.Envs == nil {
cfg.Runner.Envs = map[string]string{}
}
for _, split := range splits {
kv := strings.SplitN(split, ":", 2)
if len(kv) == 2 && kv[0] != "" {
cfg.Runner.Envs[kv[0]] = kv[1]
}
}
}
if v, ok := handleEnv("GITEA_RUNNER_ENV_FILE"); ok {
cfg.Runner.EnvFile = v
}
}

9
config/embed.go Normal file
View file

@ -0,0 +1,9 @@
// Copyright 2023 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package config
import _ "embed"
//go:embed config.example.yaml
var Example []byte

54
config/registration.go Normal file
View file

@ -0,0 +1,54 @@
// Copyright 2023 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package config
import (
"encoding/json"
"os"
)
const registrationWarning = "This file is automatically generated by act-runner. Do not edit it manually unless you know what you are doing. Removing this file will cause act runner to re-register as a new runner."
// Registration is the registration information for a runner
type Registration struct {
Warning string `json:"WARNING"` // Warning message to display, it's always the registrationWarning constant
ID int64 `json:"id"`
UUID string `json:"uuid"`
Name string `json:"name"`
Token string `json:"token"`
Address string `json:"address"`
Labels []string `json:"labels"`
}
func LoadRegistration(file string) (*Registration, error) {
f, err := os.Open(file)
if err != nil {
return nil, err
}
defer f.Close()
var reg Registration
if err := json.NewDecoder(f).Decode(&reg); err != nil {
return nil, err
}
reg.Warning = ""
return &reg, nil
}
func SaveRegistration(file string, reg *Registration) error {
f, err := os.Create(file)
if err != nil {
return err
}
defer f.Close()
reg.Warning = registrationWarning
enc := json.NewEncoder(f)
enc.SetIndent("", " ")
return enc.Encode(reg)
}

View file

@ -1,18 +0,0 @@
package core
const (
UUIDHeader = "x-runner-uuid"
TokenHeader = "x-runner-token"
VersionHeader = "x-runner-version"
)
// Runner struct
type Runner struct {
ID int64 `json:"id"`
UUID string `json:"uuid"`
Name string `json:"name"`
Token string `json:"token"`
Address string `json:"address"`
Insecure string `json:"insecure"`
Labels []string `json:"labels"`
}

3
go.mod
View file

@ -11,7 +11,6 @@ require (
github.com/go-chi/chi/v5 v5.0.8
github.com/go-chi/render v1.0.2
github.com/joho/godotenv v1.5.1
github.com/kelseyhightower/envconfig v1.4.0
github.com/mattn/go-isatty v0.0.17
github.com/nektos/act v0.0.0
github.com/sirupsen/logrus v1.9.0
@ -19,6 +18,7 @@ require (
golang.org/x/sync v0.1.0
golang.org/x/term v0.6.0
google.golang.org/protobuf v1.28.1
gopkg.in/yaml.v3 v3.0.1
modernc.org/sqlite v1.14.2
xorm.io/builder v0.3.11-0.20220531020008-1bd24a7dc978
xorm.io/xorm v1.3.2
@ -91,7 +91,6 @@ require (
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
gopkg.in/warnings.v0 v0.1.2 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
lukechampine.com/uint128 v1.1.1 // indirect
modernc.org/cc/v3 v3.35.18 // indirect
modernc.org/ccgo/v3 v3.12.82 // indirect

2
go.sum
View file

@ -302,8 +302,6 @@ github.com/julienschmidt/httprouter v1.3.0 h1:U0609e9tgbseu3rBINet9P48AI/D3oJs4d
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs=
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8=
github.com/kelseyhightower/envconfig v1.4.0 h1:Im6hONhd3pLkfDFsbRgu68RDNkGF1r3dvMUtDTo2cv8=
github.com/kelseyhightower/envconfig v1.4.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg=
github.com/kevinburke/ssh_config v0.0.0-20201106050909-4977a11b4351/go.mod h1:CT57kijsi8u/K/BOFA39wgDQJ9CxiF4nAY/ojJ6r6mM=
github.com/kevinburke/ssh_config v1.2.0 h1:x584FjTGwHzMwvHx18PXxbBVzfnxogHaAReU4gf13a4=
github.com/kevinburke/ssh_config v1.2.0/go.mod h1:CT57kijsi8u/K/BOFA39wgDQJ9CxiF4nAY/ojJ6r6mM=

View file

@ -7,22 +7,23 @@ import (
"time"
runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
"codeberg.org/forgejo/runner/client"
"github.com/bufbuild/connect-go"
log "github.com/sirupsen/logrus"
"codeberg.org/forgejo/runner/client"
"codeberg.org/forgejo/runner/config"
)
var ErrDataLock = errors.New("Data Lock Error")
func New(cli client.Client, dispatch func(context.Context, *runnerv1.Task) error, workerNum int) *Poller {
func New(cli client.Client, dispatch func(context.Context, *runnerv1.Task) error, cfg *config.Config) *Poller {
return &Poller{
Client: cli,
Dispatch: dispatch,
routineGroup: newRoutineGroup(),
metric: &metric{},
workerNum: workerNum,
ready: make(chan struct{}, 1),
cfg: cfg,
}
}
@ -34,13 +35,13 @@ type Poller struct {
routineGroup *routineGroup
metric *metric
ready chan struct{}
workerNum int
cfg *config.Config
}
func (p *Poller) schedule() {
p.Lock()
defer p.Unlock()
if int(p.metric.BusyWorkers()) >= p.workerNum {
if int(p.metric.BusyWorkers()) >= p.cfg.Runner.Capacity {
return
}
@ -54,6 +55,40 @@ func (p *Poller) Wait() {
p.routineGroup.Wait()
}
func (p *Poller) handle(ctx context.Context, l *log.Entry) {
defer func() {
if r := recover(); r != nil {
l.Errorf("handle task panic: %+v", r)
}
}()
for {
select {
case <-ctx.Done():
return
default:
task, err := p.pollTask(ctx)
if task == nil || err != nil {
if err != nil {
l.Errorf("can't find the task: %v", err.Error())
}
time.Sleep(5 * time.Second)
break
}
p.metric.IncBusyWorker()
p.routineGroup.Run(func() {
defer p.schedule()
defer p.metric.DecBusyWorker()
if err := p.dispatchTask(ctx, task); err != nil {
l.Errorf("execute task: %v", err.Error())
}
})
return
}
}
}
func (p *Poller) Poll(ctx context.Context) error {
l := log.WithField("func", "Poll")
@ -67,32 +102,7 @@ func (p *Poller) Poll(ctx context.Context) error {
case <-ctx.Done():
return nil
}
LOOP:
for {
select {
case <-ctx.Done():
break LOOP
default:
task, err := p.pollTask(ctx)
if task == nil || err != nil {
if err != nil {
l.Errorf("can't find the task: %v", err.Error())
}
time.Sleep(5 * time.Second)
break
}
p.metric.IncBusyWorker()
p.routineGroup.Run(func() {
defer p.schedule()
defer p.metric.DecBusyWorker()
if err := p.dispatchTask(ctx, task); err != nil {
l.Errorf("execute task: %v", err.Error())
}
})
break LOOP
}
}
p.handle(ctx, l)
}
}
@ -139,7 +149,7 @@ func (p *Poller) dispatchTask(ctx context.Context, task *runnerv1.Task) error {
}
}()
runCtx, cancel := context.WithTimeout(ctx, time.Hour)
runCtx, cancel := context.WithTimeout(ctx, p.cfg.Runner.Timeout)
defer cancel()
return p.Dispatch(runCtx, task)

View file

@ -1,63 +0,0 @@
package register
import (
"context"
"encoding/json"
"os"
"strconv"
"strings"
runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
"codeberg.org/forgejo/runner/client"
"codeberg.org/forgejo/runner/config"
"codeberg.org/forgejo/runner/core"
"github.com/bufbuild/connect-go"
log "github.com/sirupsen/logrus"
)
func New(cli client.Client) *Register {
return &Register{
Client: cli,
}
}
type Register struct {
Client client.Client
}
func (p *Register) Register(ctx context.Context, cfg config.Runner) (*core.Runner, error) {
labels := make([]string, len(cfg.Labels))
for i, v := range cfg.Labels {
labels[i] = strings.SplitN(v, ":", 2)[0]
}
// register new runner.
resp, err := p.Client.Register(ctx, connect.NewRequest(&runnerv1.RegisterRequest{
Name: cfg.Name,
Token: cfg.Token,
AgentLabels: labels,
}))
if err != nil {
log.WithError(err).Error("poller: cannot register new runner")
return nil, err
}
data := &core.Runner{
ID: resp.Msg.Runner.Id,
UUID: resp.Msg.Runner.Uuid,
Name: resp.Msg.Runner.Name,
Token: resp.Msg.Runner.Token,
Address: p.Client.Address(),
Insecure: strconv.FormatBool(p.Client.Insecure()),
Labels: cfg.Labels,
}
file, err := json.MarshalIndent(data, "", " ")
if err != nil {
log.WithError(err).Error("poller: cannot marshal the json input")
return data, err
}
// store runner config in .runner file
return data, os.WriteFile(cfg.File, file, 0o644)
}

26
runtime/label.go Normal file
View file

@ -0,0 +1,26 @@
// Copyright 2023 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package runtime
import (
"fmt"
"strings"
)
func ParseLabel(str string) (label, schema, arg string, err error) {
splits := strings.SplitN(str, ":", 3)
label = splits[0]
schema = "host"
arg = ""
if len(splits) >= 2 {
schema = splits[1]
}
if len(splits) >= 3 {
arg = splits[2]
}
if schema != "host" && schema != "docker" {
return "", "", "", fmt.Errorf("unsupported schema: %s", schema)
}
return
}

63
runtime/label_test.go Normal file
View file

@ -0,0 +1,63 @@
// Copyright 2023 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package runtime
import "testing"
func TestParseLabel(t *testing.T) {
tests := []struct {
args string
wantLabel string
wantSchema string
wantArg string
wantErr bool
}{
{
args: "ubuntu:docker://node:18",
wantLabel: "ubuntu",
wantSchema: "docker",
wantArg: "//node:18",
wantErr: false,
},
{
args: "ubuntu:host",
wantLabel: "ubuntu",
wantSchema: "host",
wantArg: "",
wantErr: false,
},
{
args: "ubuntu",
wantLabel: "ubuntu",
wantSchema: "host",
wantArg: "",
wantErr: false,
},
{
args: "ubuntu:vm:ubuntu-18.04",
wantLabel: "",
wantSchema: "",
wantArg: "",
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.args, func(t *testing.T) {
gotLabel, gotSchema, gotArg, err := ParseLabel(tt.args)
if (err != nil) != tt.wantErr {
t.Errorf("parseLabel() error = %v, wantErr %v", err, tt.wantErr)
return
}
if gotLabel != tt.wantLabel {
t.Errorf("parseLabel() gotLabel = %v, want %v", gotLabel, tt.wantLabel)
}
if gotSchema != tt.wantSchema {
t.Errorf("parseLabel() gotSchema = %v, want %v", gotSchema, tt.wantSchema)
}
if gotArg != tt.wantArg {
t.Errorf("parseLabel() gotArg = %v, want %v", gotArg, tt.wantArg)
}
})
}
}

View file

@ -99,7 +99,7 @@ func (r *Reporter) Fire(entry *log.Entry) error {
var step *runnerv1.StepState
if v, ok := entry.Data["stepNumber"]; ok {
if v, ok := v.(int); ok {
if v, ok := v.(int); ok && len(r.state.Steps) > v {
step = r.state.Steps[v]
}
}

View file

@ -7,6 +7,7 @@ import (
runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
"codeberg.org/forgejo/runner/artifactcache"
"codeberg.org/forgejo/runner/client"
log "github.com/sirupsen/logrus"
)
// Runner runs the pipeline.
@ -17,6 +18,7 @@ type Runner struct {
Environ map[string]string
Client client.Client
Labels []string
Network string
CacheHandler *artifactcache.Handler
}
@ -26,33 +28,31 @@ func (s *Runner) Run(ctx context.Context, task *runnerv1.Task) error {
for k, v := range s.Environ {
env[k] = v
}
env["ACTIONS_CACHE_URL"] = s.CacheHandler.ExternalURL() + "/"
return NewTask(s.ForgeInstance, task.Id, s.Client, env, s.platformPicker).Run(ctx, task, s.Machine, s.Version)
if s.CacheHandler != nil {
env["ACTIONS_CACHE_URL"] = s.CacheHandler.ExternalURL() + "/"
}
return NewTask(task.Id, s.Client, env, s.Network, s.platformPicker).Run(ctx, task, s.Machine, s.Version)
}
func (s *Runner) platformPicker(labels []string) string {
// "ubuntu-18.04:docker://node:16-buster"
// "self-hosted"
platforms := make(map[string]string, len(labels))
platforms := make(map[string]string, len(s.Labels))
for _, l := range s.Labels {
// "ubuntu-18.04:docker://node:16-buster"
splits := strings.SplitN(l, ":", 2)
if len(splits) == 1 {
// identifier for non docker execution environment
platforms[splits[0]] = "-self-hosted"
label, schema, arg, err := ParseLabel(l)
if err != nil {
log.Errorf("invaid label %q: %v", l, err)
continue
}
// ["ubuntu-18.04", "docker://node:16-buster"]
k, v := splits[0], splits[1]
if prefix := "docker://"; !strings.HasPrefix(v, prefix) {
switch schema {
case "docker":
// TODO "//" will be ignored, maybe we should use 'ubuntu-18.04:docker:node:16-buster' instead
platforms[label] = strings.TrimPrefix(arg, "//")
case "host":
platforms[label] = "-self-hosted"
default:
// It should not happen, because ParseLabel has checked it.
continue
} else {
v = strings.TrimPrefix(v, prefix)
}
// ubuntu-18.04 => node:16-buster
platforms[k] = v
}
for _, label := range labels {
@ -67,6 +67,8 @@ func (s *Runner) platformPicker(labels []string) string {
// ["with-gpu"] => "linux:with-gpu"
// ["ubuntu-22.04", "with-gpu"] => "ubuntu:22.04_with-gpu"
// return default
return "node:16-bullseye"
// return default.
// So the runner receives a task with a label that the runner doesn't have,
// it happens when the user have edited the label of the runner in the web UI.
return "node:16-bullseye" // TODO: it may be not correct, what if the runner is used as host mode only?
}

View file

@ -71,11 +71,11 @@ type Task struct {
}
// NewTask creates a new task
func NewTask(forgeInstance string, buildID int64, client client.Client, runnerEnvs map[string]string, picker func([]string) string) *Task {
func NewTask(buildID int64, client client.Client, runnerEnvs map[string]string, network string, picker func([]string) string) *Task {
task := &Task{
Input: &TaskInput{
envs: runnerEnvs,
containerNetworkMode: "bridge", // TODO should be configurable
containerNetworkMode: network,
},
BuildID: buildID,