mirror of
https://gitea.com/gitea/act_runner.git
synced 2025-06-13 01:57:13 +02:00
Reduce unnecessary DB queries for Actions tasks (#219)
implement: https://github.com/go-gitea/gitea/issues/24544 Changes: - Add a global variable `tasksVersion` to store the lastest version number which returned by Gitea. - Pass `tasksVersion` to Gitea when invoking `FetchTask`. - If there is no task in the `FetchTask` response, store the version from the `FetchTask` response into `tasksVersion` variable. - If there is a task in the `FetchTask` response, set `tasksVersion` to zero to focre query db in next `FetchTask` request. Related: - Protocol: https://gitea.com/gitea/actions-proto-def/pulls/10 - Gitea side: https://github.com/go-gitea/gitea/pull/25199 Reviewed-on: https://gitea.com/gitea/act_runner/pulls/219 Co-authored-by: sillyguodong <gedong_1994@163.com> Co-committed-by: sillyguodong <gedong_1994@163.com>
This commit is contained in:
@ -8,6 +8,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
|
||||
"github.com/bufbuild/connect-go"
|
||||
@ -20,9 +21,10 @@ import (
|
||||
)
|
||||
|
||||
type Poller struct {
|
||||
client client.Client
|
||||
runner *run.Runner
|
||||
cfg *config.Config
|
||||
client client.Client
|
||||
runner *run.Runner
|
||||
cfg *config.Config
|
||||
tasksVersion atomic.Int64 // tasksVersion used to store the version of the last task fetched from the Gitea.
|
||||
}
|
||||
|
||||
func New(cfg *config.Config, client client.Client, runner *run.Runner) *Poller {
|
||||
@ -77,7 +79,11 @@ func (p *Poller) fetchTask(ctx context.Context) (*runnerv1.Task, bool) {
|
||||
reqCtx, cancel := context.WithTimeout(ctx, p.cfg.Runner.FetchTimeout)
|
||||
defer cancel()
|
||||
|
||||
resp, err := p.client.FetchTask(reqCtx, connect.NewRequest(&runnerv1.FetchTaskRequest{}))
|
||||
// Load the version value that was in the cache when the request was sent.
|
||||
v := p.tasksVersion.Load()
|
||||
resp, err := p.client.FetchTask(reqCtx, connect.NewRequest(&runnerv1.FetchTaskRequest{
|
||||
TasksVersion: v,
|
||||
}))
|
||||
if errors.Is(err, context.DeadlineExceeded) {
|
||||
err = nil
|
||||
}
|
||||
@ -86,8 +92,20 @@ func (p *Poller) fetchTask(ctx context.Context) (*runnerv1.Task, bool) {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
if resp == nil || resp.Msg == nil || resp.Msg.Task == nil {
|
||||
if resp == nil || resp.Msg == nil {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
if resp.Msg.TasksVersion > v {
|
||||
p.tasksVersion.CompareAndSwap(v, resp.Msg.TasksVersion)
|
||||
}
|
||||
|
||||
if resp.Msg.Task == nil {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
// got a task, set `tasksVersion` to zero to focre query db in next request.
|
||||
p.tasksVersion.CompareAndSwap(resp.Msg.TasksVersion, 0)
|
||||
|
||||
return resp.Msg.Task, true
|
||||
}
|
||||
|
Reference in New Issue
Block a user