refactor functions

This commit is contained in:
Lunny Xiao 2022-06-20 17:23:34 +08:00 committed by Jason Song
parent 5903c08c14
commit c7cb750616
2 changed files with 111 additions and 98 deletions

View file

@ -4,6 +4,7 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt"
"os" "os"
"os/signal" "os/signal"
"strings" "strings"
@ -35,6 +36,99 @@ const (
MsgTypeBuildResult // build result MsgTypeBuildResult // build result
) )
func handleVersion1(conn *websocket.Conn, sigs chan os.Signal, message []byte, msg *Message) error {
switch msg.Type {
case MsgTypeRegister:
log.Info().Msgf("received registered success: %s", message)
return conn.WriteJSON(&Message{
Version: 1,
Type: MsgTypeRequestBuild,
RunnerUUID: msg.RunnerUUID,
})
case MsgTypeError:
log.Info().Msgf("received error msessage: %s", message)
return conn.WriteJSON(&Message{
Version: 1,
Type: MsgTypeRequestBuild,
RunnerUUID: msg.RunnerUUID,
})
case MsgTypeIdle:
log.Info().Msgf("received no task")
return conn.WriteJSON(&Message{
Version: 1,
Type: MsgTypeRequestBuild,
RunnerUUID: msg.RunnerUUID,
})
case MsgTypeRequestBuild:
switch msg.EventName {
case "push":
input := Input{
forgeInstance: "github.com",
reuseContainers: true,
}
ctx, cancel := context.WithTimeout(context.Background(), time.Hour)
defer cancel()
done := make(chan error)
go func(chan error) {
done <- runTask(ctx, &input, "")
}(done)
c := time.NewTicker(time.Second)
defer c.Stop()
for {
select {
case <-sigs:
cancel()
log.Info().Msgf("cancel task")
return nil
case err := <-done:
if err != nil {
log.Error().Msgf("runTask failed: %v", err)
return conn.WriteJSON(&Message{
Version: 1,
Type: MsgTypeBuildResult,
RunnerUUID: msg.RunnerUUID,
BuildUUID: msg.BuildUUID,
ErrCode: 1,
ErrContent: err.Error(),
})
}
log.Error().Msgf("runTask success")
return conn.WriteJSON(&Message{
Version: 1,
Type: MsgTypeBuildResult,
RunnerUUID: msg.RunnerUUID,
BuildUUID: msg.BuildUUID,
})
case <-c.C:
}
}
default:
return fmt.Errorf("unknow event %s with payload %s", msg.EventName, msg.EventPayload)
}
default:
return fmt.Errorf("received a message with an unsupported type: %#v", msg)
}
}
// TODO: handle the message
func handleMessage(conn *websocket.Conn, sigs chan os.Signal, message []byte) error {
var msg Message
if err := json.Unmarshal(message, &msg); err != nil {
return fmt.Errorf("unmarshal received message faild: %v", err)
}
switch msg.Version {
case 1:
return handleVersion1(conn, sigs, message, &msg)
default:
return fmt.Errorf("recevied a message with an unsupported version, consider upgrade your runner")
}
}
func runDaemon(ctx context.Context, input *Input) func(cmd *cobra.Command, args []string) error { func runDaemon(ctx context.Context, input *Input) func(cmd *cobra.Command, args []string) error {
log.Info().Msgf("Starting runner daemon") log.Info().Msgf("Starting runner daemon")
@ -44,8 +138,15 @@ func runDaemon(ctx context.Context, input *Input) func(cmd *cobra.Command, args
ticker := time.NewTicker(time.Second) ticker := time.NewTicker(time.Second)
defer ticker.Stop() defer ticker.Stop()
var failedCnt int var failedCnt int
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
for { for {
select { select {
case <-sigs:
log.Info().Msgf("cancel task")
return nil
case <-ctx.Done(): case <-ctx.Done():
if conn != nil { if conn != nil {
err = conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) err = conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
@ -89,6 +190,13 @@ func runDaemon(ctx context.Context, input *Input) func(cmd *cobra.Command, args
const timeout = time.Second * 10 const timeout = time.Second * 10
for { for {
select {
case <-sigs:
log.Info().Msgf("cancel task")
return nil
default:
}
conn.SetReadDeadline(time.Now().Add(timeout)) conn.SetReadDeadline(time.Now().Add(timeout))
conn.SetPongHandler(func(string) error { conn.SetReadDeadline(time.Now().Add(timeout)); return nil }) conn.SetPongHandler(func(string) error { conn.SetReadDeadline(time.Now().Add(timeout)); return nil })
@ -110,101 +218,10 @@ func runDaemon(ctx context.Context, input *Input) func(cmd *cobra.Command, args
} }
failedCnt = 0 failedCnt = 0
} }
break return nil
} }
// TODO: handle the message if err := handleMessage(conn, sigs, message); err != nil {
var msg Message
if err = json.Unmarshal(message, &msg); err != nil {
log.Error().Msgf("unmarshal received message faild: %v", err)
continue
}
switch msg.Version {
case 1:
switch msg.Type {
case MsgTypeRegister:
log.Info().Msgf("received registered success: %s", message)
conn.WriteJSON(&Message{
Version: 1,
Type: MsgTypeRequestBuild,
RunnerUUID: msg.RunnerUUID,
})
case MsgTypeError:
log.Info().Msgf("received error msessage: %s", message)
conn.WriteJSON(&Message{
Version: 1,
Type: MsgTypeRequestBuild,
RunnerUUID: msg.RunnerUUID,
})
case MsgTypeIdle:
log.Info().Msgf("received no task")
conn.WriteJSON(&Message{
Version: 1,
Type: MsgTypeRequestBuild,
RunnerUUID: msg.RunnerUUID,
})
case MsgTypeRequestBuild:
switch msg.EventName {
case "push":
input := Input{
forgeInstance: "github.com",
reuseContainers: true,
}
ctx, cancel := context.WithTimeout(context.Background(), time.Hour)
defer cancel()
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
done := make(chan error)
go func(chan error) {
done <- runTask(ctx, &input, "")
}(done)
c := time.NewTicker(time.Second)
defer c.Stop()
END:
for {
select {
case <-sigs:
cancel()
log.Info().Msgf("cancel task")
break END
case err := <-done:
if err != nil {
log.Error().Msgf("runTask failed: %v", err)
conn.WriteJSON(&Message{
Version: 1,
Type: MsgTypeBuildResult,
RunnerUUID: msg.RunnerUUID,
BuildUUID: msg.BuildUUID,
ErrCode: 1,
ErrContent: err.Error(),
})
} else {
log.Error().Msgf("runTask success")
conn.WriteJSON(&Message{
Version: 1,
Type: MsgTypeBuildResult,
RunnerUUID: msg.RunnerUUID,
BuildUUID: msg.BuildUUID,
})
}
break END
case <-c.C:
}
}
default:
log.Warn().Msgf("unknow event %s with payload %s", msg.EventName, msg.EventPayload)
}
default:
log.Error().Msgf("received a message with an unsupported type: %#v", msg)
}
default:
log.Error().Msgf("recevied a message with an unsupported version, consider upgrade your runner")
} }
} }
} }

View file

@ -7,7 +7,6 @@ import (
"path/filepath" "path/filepath"
"github.com/nektos/act/pkg/artifacts" "github.com/nektos/act/pkg/artifacts"
"github.com/nektos/act/pkg/common"
"github.com/nektos/act/pkg/model" "github.com/nektos/act/pkg/model"
"github.com/nektos/act/pkg/runner" "github.com/nektos/act/pkg/runner"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
@ -193,10 +192,7 @@ func runTask(ctx context.Context, input *Input, jobID string) error {
return fmt.Errorf("New config failed: %v", err) return fmt.Errorf("New config failed: %v", err)
} }
log := logrus.StandardLogger() logrus.AddHook(&StepHook{})
log.AddHook(&StepHook{})
ctx = common.WithLogger(ctx, log)
cancel := artifacts.Serve(ctx, input.artifactServerPath, input.artifactServerPort) cancel := artifacts.Serve(ctx, input.artifactServerPath, input.artifactServerPort)