chore(runner): support update log and task

Signed-off-by: Bo-Yi.Wu <appleboy.tw@gmail.com>
This commit is contained in:
Bo-Yi.Wu
2022-09-25 18:54:00 +08:00
committed by Jason Song
parent 20c3d85ba9
commit 82431d8e11
11 changed files with 489 additions and 375 deletions

263
runtime/reporter.go Normal file
View File

@ -0,0 +1,263 @@
package runtime
import (
"context"
"fmt"
"strings"
"sync"
"time"
"gitea.com/gitea/act_runner/client"
runnerv1 "gitea.com/gitea/proto-go/runner/v1"
"github.com/avast/retry-go/v4"
"github.com/bufbuild/connect-go"
log "github.com/sirupsen/logrus"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
)
type Reporter struct {
ctx context.Context
closed bool
client client.Client
clientM sync.Mutex
logOffset int
logRows []*runnerv1.LogRow
state *runnerv1.TaskState
stateM sync.RWMutex
}
func NewReporter(ctx context.Context, client client.Client, taskID int64) *Reporter {
return &Reporter{
ctx: ctx,
client: client,
state: &runnerv1.TaskState{
Id: taskID,
},
}
}
func (r *Reporter) ResetSteps(l int) {
r.stateM.Lock()
defer r.stateM.Unlock()
for i := 0; i < l; i++ {
r.state.Steps = append(r.state.Steps, &runnerv1.StepState{
Id: int64(i),
})
}
}
func (r *Reporter) Levels() []log.Level {
return log.AllLevels
}
func (r *Reporter) Fire(entry *log.Entry) error {
r.stateM.Lock()
defer r.stateM.Unlock()
timestamp := entry.Time
if r.state.StartedAt == nil {
r.state.StartedAt = timestamppb.New(timestamp)
}
var step *runnerv1.StepState
if v, ok := entry.Data["stepNumber"]; ok {
if v, ok := v.(int); ok {
step = r.state.Steps[v]
}
}
if step == nil {
if v, ok := entry.Data["jobResult"]; ok {
if v, ok := v.(string); ok {
if jobResult := r.parseResult(v); jobResult != runnerv1.Result_RESULT_UNSPECIFIED {
r.state.Result = jobResult
r.state.StoppedAt = timestamppb.New(timestamp)
for _, s := range r.state.Steps {
if s.Result == runnerv1.Result_RESULT_UNSPECIFIED {
s.Result = runnerv1.Result_RESULT_CANCELLED
}
}
}
}
}
if !r.duringSteps() {
r.logRows = append(r.logRows, r.parseLogRow(entry))
}
return nil
}
if step.StartedAt == nil {
step.StartedAt = timestamppb.New(timestamp)
}
if v, ok := entry.Data["raw_output"]; ok {
if rawOutput, ok := v.(bool); ok && rawOutput {
if step.LogLength == 0 {
step.LogIndex = int64(r.logOffset + len(r.logRows))
}
step.LogLength++
r.logRows = append(r.logRows, r.parseLogRow(entry))
return nil
}
}
log.Info(entry.Data)
if v, ok := entry.Data["stepResult"]; ok {
if v, ok := v.(string); ok {
if stepResult := r.parseResult(v); stepResult != runnerv1.Result_RESULT_UNSPECIFIED {
step.Result = stepResult
step.StoppedAt = timestamppb.New(timestamp)
}
}
}
return nil
}
func (r *Reporter) RunDaemon() {
if r.closed {
return
}
if r.ctx.Err() != nil {
return
}
_ = r.ReportLog(false)
_ = r.ReportState()
time.AfterFunc(time.Second, r.RunDaemon)
}
func (r *Reporter) Logf(format string, a ...interface{}) {
r.stateM.Lock()
defer r.stateM.Unlock()
if !r.duringSteps() {
r.logRows = append(r.logRows, &runnerv1.LogRow{
Time: timestamppb.Now(),
Content: fmt.Sprintf(format, a...),
})
}
}
func (r *Reporter) Close(lastWords string) error {
r.closed = true
r.stateM.Lock()
if r.state.Result == runnerv1.Result_RESULT_UNSPECIFIED {
if lastWords == "" {
lastWords = "Early termination"
}
for _, v := range r.state.Steps {
if v.Result == runnerv1.Result_RESULT_UNSPECIFIED {
v.Result = runnerv1.Result_RESULT_CANCELLED
}
}
r.logRows = append(r.logRows, &runnerv1.LogRow{
Time: timestamppb.Now(),
Content: lastWords,
})
return nil
} else if lastWords != "" {
r.logRows = append(r.logRows, &runnerv1.LogRow{
Time: timestamppb.Now(),
Content: lastWords,
})
}
r.stateM.Unlock()
if err := retry.Do(func() error {
if err := r.ReportLog(true); err != nil {
return err
}
return r.ReportState()
}, retry.Context(r.ctx)); err != nil {
return err
}
return nil
}
func (r *Reporter) ReportLog(noMore bool) error {
r.clientM.Lock()
defer r.clientM.Unlock()
r.stateM.RLock()
rows := r.logRows
r.stateM.RUnlock()
resp, err := r.client.UpdateLog(r.ctx, connect.NewRequest(&runnerv1.UpdateLogRequest{
TaskId: r.state.Id,
Index: int64(r.logOffset),
Rows: rows,
NoMore: noMore,
}))
if err != nil {
return err
}
ack := int(resp.Msg.AckIndex)
if ack < r.logOffset {
return fmt.Errorf("submitted logs are lost")
}
r.stateM.Lock()
r.logRows = r.logRows[ack-r.logOffset:]
r.logOffset = ack
r.stateM.Unlock()
if noMore && ack < r.logOffset+len(rows) {
return fmt.Errorf("not all logs are submitted")
}
return nil
}
func (r *Reporter) ReportState() error {
r.clientM.Lock()
defer r.clientM.Unlock()
r.stateM.RLock()
state := proto.Clone(r.state).(*runnerv1.TaskState)
r.stateM.RUnlock()
_, err := r.client.UpdateTask(r.ctx, connect.NewRequest(&runnerv1.UpdateTaskRequest{
State: state,
}))
return err
}
func (r *Reporter) duringSteps() bool {
if steps := r.state.Steps; len(steps) == 0 {
return false
} else if first := steps[0]; first.Result == runnerv1.Result_RESULT_UNSPECIFIED && first.LogLength == 0 {
return false
} else if last := steps[len(steps)-1]; last.Result != runnerv1.Result_RESULT_UNSPECIFIED {
return false
}
return true
}
func (r *Reporter) parseResult(s string) runnerv1.Result {
switch s {
case "success":
return runnerv1.Result_RESULT_SUCCESS
case "failure":
return runnerv1.Result_RESULT_FAILURE
case "skipped":
return runnerv1.Result_RESULT_SKIPPED
case "cancelled":
return runnerv1.Result_RESULT_CANCELLED
}
return runnerv1.Result_RESULT_UNSPECIFIED
}
func (r *Reporter) parseLogRow(entry *log.Entry) *runnerv1.LogRow {
return &runnerv1.LogRow{
Time: timestamppb.New(entry.Time),
Content: strings.TrimSuffix(entry.Message, "\r\n"),
}
}

View File

@ -7,6 +7,7 @@ import (
"gitea.com/gitea/act_runner/client"
runnerv1 "gitea.com/gitea/proto-go/runner/v1"
"github.com/bufbuild/connect-go"
log "github.com/sirupsen/logrus"
)
@ -26,37 +27,31 @@ type Runner struct {
}
// Run runs the pipeline stage.
func (s *Runner) Run(ctx context.Context, stage *runnerv1.Stage) error {
func (s *Runner) Run(ctx context.Context, task *runnerv1.Task) error {
l := log.
WithField("stage.id", stage.Id).
WithField("stage.name", stage.Name)
WithField("task.id", task.Id)
l.Info("start running pipeline")
// update machine in stage
stage.Machine = s.Machine
data, err := s.Client.Detail(ctx, &runnerv1.DetailRequest{
Stage: stage,
})
task.Machine = s.Machine
data, err := s.Client.Detail(ctx, connect.NewRequest(&runnerv1.DetailRequest{
Task: task,
}))
if err != nil && err == ErrDataLock {
l.Info("stage accepted by another runner")
l.Info("task accepted by another runner")
return nil
}
if err != nil {
l.WithError(err).Error("cannot accept stage")
l.WithError(err).Error("cannot accept task")
return err
}
l = log.WithField("repo.id", data.Repo.Id).
WithField("repo.name", data.Repo.Name).
WithField("build.id", data.Build.Id).
WithField("build.name", data.Build.Name)
l.Info("task details fetched")
l.Info("stage details fetched")
return s.run(ctx, data)
return s.run(ctx, data.Msg.Task)
}
func (s *Runner) run(ctx context.Context, data *runnerv1.DetailResponse) error {
return NewTask(data.Build.Id, s.Client).Run(ctx, data)
func (s *Runner) run(ctx context.Context, task *runnerv1.Task) error {
return NewTask(task.Id, s.Client).Run(ctx, task)
}

View File

@ -1,12 +1,12 @@
package runtime
import (
"bytes"
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
"sync"
"time"
"gitea.com/gitea/act_runner/client"
runnerv1 "gitea.com/gitea/proto-go/runner/v1"
@ -57,38 +57,6 @@ type TaskInput struct {
EnvFile string
}
type taskLogHook struct {
entries []*log.Entry
lock sync.Mutex
}
func (h *taskLogHook) Levels() []log.Level {
return log.AllLevels
}
func (h *taskLogHook) Fire(entry *log.Entry) error {
if flag, ok := entry.Data["raw_output"]; ok {
h.lock.Lock()
if flagVal, ok := flag.(bool); flagVal && ok {
log.Infof("task log: %s", entry.Message)
h.entries = append(h.entries, entry)
}
h.lock.Unlock()
}
return nil
}
func (h *taskLogHook) swapLogs() []*log.Entry {
if len(h.entries) == 0 {
return nil
}
h.lock.Lock()
entries := h.entries
h.entries = nil
h.lock.Unlock()
return entries
}
type TaskState int
const (
@ -112,13 +80,12 @@ type Task struct {
BuildID int64
Input *TaskInput
logHook *taskLogHook
state TaskState
client client.Client
log *log.Entry
state TaskState
client client.Client
log *log.Entry
}
// newTask creates a new task
// NewTask creates a new task
func NewTask(buildID int64, client client.Client) *Task {
task := &Task{
Input: &TaskInput{
@ -127,10 +94,9 @@ func NewTask(buildID int64, client client.Client) *Task {
},
BuildID: buildID,
state: TaskStatePending,
client: client,
log: log.WithField("buildID", buildID),
logHook: &taskLogHook{},
state: TaskStatePending,
client: client,
log: log.WithField("buildID", buildID),
}
task.Input.repoDirectory, _ = os.Getwd()
return task
@ -157,139 +123,96 @@ func demoPlatforms() map[string]string {
}
}
// reportFailure reports the failure of the task
func (t *Task) reportFailure(ctx context.Context, err error) {
t.state = TaskStateFailure
finishTask(t.BuildID)
t.log.Errorf("task failed: %v", err)
if t.client == nil {
// TODO: fill the step request
stepRequest := &runnerv1.UpdateStepRequest{}
_ = t.client.UpdateStep(ctx, stepRequest)
return
}
}
func (t *Task) startReporting(ctx context.Context, interval int64) {
for {
time.Sleep(time.Duration(interval) * time.Second)
if t.state == TaskStateSuccess || t.state == TaskStateFailure {
t.log.Debugf("task reporting stopped")
break
}
t.reportStep(ctx)
}
}
// reportStep reports the step of the task
func (t *Task) reportStep(ctx context.Context) {
if t.client == nil {
return
}
logValues := t.logHook.swapLogs()
if len(logValues) == 0 {
t.log.Debugf("no log to report")
return
}
t.log.Infof("reporting %d logs", len(logValues))
// TODO: fill the step request
stepRequest := &runnerv1.UpdateStepRequest{}
_ = t.client.UpdateStep(ctx, stepRequest)
}
// reportSuccess reports the success of the task
func (t *Task) reportSuccess(ctx context.Context) {
t.state = TaskStateSuccess
finishTask(t.BuildID)
t.log.Infof("task success")
if t.client == nil {
return
}
// TODO: fill the step request
stepRequest := &runnerv1.UpdateStepRequest{}
_ = t.client.UpdateStep(ctx, stepRequest)
}
func (t *Task) Run(ctx context.Context, data *runnerv1.DetailResponse) error {
_, exist := globalTaskMap.Load(data.Build.Id)
func (t *Task) Run(ctx context.Context, task *runnerv1.Task) error {
_, exist := globalTaskMap.Load(task.Id)
if exist {
return fmt.Errorf("task %d already exists", data.Build.Id)
return fmt.Errorf("task %d already exists", task.Id)
}
// set task ve to global map
// when task is done or canceled, it will be removed from the map
globalTaskMap.Store(data.Build.Id, t)
globalTaskMap.Store(task.Id, t)
defer globalTaskMap.Delete(task.Id)
lastWords := ""
reporter := NewReporter(ctx, t.client, task.Id)
defer func() {
_ = reporter.Close(lastWords)
}()
reporter.RunDaemon()
reporter.Logf("received task %v of job %v", task.Id, task.Context.Fields["job"].GetStringValue())
workflowsPath, err := getWorkflowsPath(t.Input.repoDirectory)
if err != nil {
t.reportFailure(ctx, err)
lastWords = err.Error()
return err
}
t.log.Debugf("workflows path: %s", workflowsPath)
planner, err := model.NewWorkflowPlanner(workflowsPath, false)
workflow, err := model.ReadWorkflow(bytes.NewReader(task.WorkflowPayload))
if err != nil {
t.reportFailure(ctx, err)
lastWords = err.Error()
return err
}
var eventName string
events := planner.GetEvents()
if len(events) > 0 {
// set default event type to first event
// this way user dont have to specify the event.
t.log.Debugf("Using detected workflow event: %s", events[0])
eventName = events[0]
var plan *model.Plan
if jobIDs := workflow.GetJobIDs(); len(jobIDs) != 1 {
err := fmt.Errorf("multiple jobs fould: %v", jobIDs)
lastWords = err.Error()
return err
} else {
if plan := planner.PlanEvent("push"); plan != nil {
eventName = "push"
}
jobID := jobIDs[0]
plan = model.CombineWorkflowPlanner(workflow).PlanJob(jobID)
job := workflow.GetJob(jobID)
reporter.ResetSteps(len(job.Steps))
}
// build the plan for this run
var plan *model.Plan
jobID := ""
if t.BuildID > 0 {
jobID = fmt.Sprintf("%d", t.BuildID)
}
if jobID != "" {
t.log.Infof("Planning job: %s", jobID)
plan = planner.PlanJob(jobID)
} else {
t.log.Infof("Planning event: %s", eventName)
plan = planner.PlanEvent(eventName)
}
log.Infof("plan: %+v", plan.Stages[0].Runs)
curDir, err := os.Getwd()
if err != nil {
t.reportFailure(ctx, err)
lastWords = err.Error()
return err
}
dataContext := task.Context.Fields
preset := &model.GithubContext{
Event: dataContext["event"].GetStructValue().AsMap(),
RunID: dataContext["run_id"].GetStringValue(),
RunNumber: dataContext["run_number"].GetStringValue(),
Actor: dataContext["actor"].GetStringValue(),
Repository: dataContext["repository"].GetStringValue(),
EventName: dataContext["event_name"].GetStringValue(),
Sha: dataContext["sha"].GetStringValue(),
Ref: dataContext["ref"].GetStringValue(),
RefName: dataContext["ref_name"].GetStringValue(),
RefType: dataContext["ref_type"].GetStringValue(),
HeadRef: dataContext["head_ref"].GetStringValue(),
BaseRef: dataContext["base_ref"].GetStringValue(),
Token: dataContext["token"].GetStringValue(),
RepositoryOwner: dataContext["repository_owner"].GetStringValue(),
RetentionDays: dataContext["retention_days"].GetStringValue(),
}
eventJSON, err := json.Marshal(preset.Event)
if err != nil {
lastWords = err.Error()
return err
}
// run the plan
input := t.Input
config := &runner.Config{
Actor: input.actor,
EventName: eventName,
EventPath: "",
DefaultBranch: "",
ForcePull: input.forcePull,
ForceRebuild: input.forceRebuild,
ReuseContainers: input.reuseContainers,
Workdir: curDir,
BindWorkdir: input.bindWorkdir,
LogOutput: true,
JSONLogger: input.jsonLogger,
// Env: envs,
// Secrets: secrets,
Workdir: curDir, // TODO: temp dir?
BindWorkdir: input.bindWorkdir,
ReuseContainers: input.reuseContainers,
ForcePull: input.forcePull,
ForceRebuild: input.forceRebuild,
LogOutput: true,
JSONLogger: input.jsonLogger,
Secrets: task.Secrets,
InsecureSecrets: input.insecureSecrets,
Platforms: demoPlatforms(),
Platforms: demoPlatforms(), // TODO: supported platforms
Privileged: input.privileged,
UsernsMode: input.usernsMode,
ContainerArchitecture: input.containerArchitecture,
@ -302,11 +225,12 @@ func (t *Task) Run(ctx context.Context, data *runnerv1.DetailResponse) error {
ArtifactServerPath: input.artifactServerPath,
ArtifactServerPort: input.artifactServerPort,
NoSkipCheckout: input.noSkipCheckout,
// RemoteName: input.remoteName,
PresetGitHubContext: preset,
EventJSON: string(eventJSON),
}
r, err := runner.New(config)
if err != nil {
t.reportFailure(ctx, err)
lastWords = err.Error()
return err
}
@ -319,17 +243,15 @@ func (t *Task) Run(ctx context.Context, data *runnerv1.DetailResponse) error {
})
t.log.Infof("workflow prepared")
reporter.Logf("workflow prepared")
// add logger recorders
ctx = common.WithLoggerHook(ctx, t.logHook)
go t.startReporting(ctx, 1)
ctx = common.WithLoggerHook(ctx, reporter)
if err := executor(ctx); err != nil {
t.reportFailure(ctx, err)
lastWords = err.Error()
return err
}
t.reportSuccess(ctx)
return nil
}