feat(dhcp): Cancel backoff retry on stop

Signed-off-by: Songmin Li <lisongmin@protonmail.com>
This commit is contained in:
Songmin Li 2024-09-20 08:26:02 +08:00 committed by Casey Callendrello
parent d61e7e5e1f
commit a4fc6f93c7
5 changed files with 72 additions and 51 deletions

View File

@ -39,19 +39,21 @@ import (
var errNoMoreTries = errors.New("no more tries")
type DHCP struct {
mux sync.Mutex
leases map[string]*DHCPLease
hostNetnsPrefix string
clientTimeout time.Duration
clientResendMax time.Duration
broadcast bool
mux sync.Mutex
leases map[string]*DHCPLease
hostNetnsPrefix string
clientTimeout time.Duration
clientResendMax time.Duration
clientResendTimeout time.Duration
broadcast bool
}
func newDHCP(clientTimeout, clientResendMax time.Duration) *DHCP {
func newDHCP(clientTimeout, clientResendMax time.Duration, resendTimeout time.Duration) *DHCP {
return &DHCP{
leases: make(map[string]*DHCPLease),
clientTimeout: clientTimeout,
clientResendMax: clientResendMax,
leases: make(map[string]*DHCPLease),
clientTimeout: clientTimeout,
clientResendMax: clientResendMax,
clientResendTimeout: resendTimeout,
}
}
@ -90,7 +92,7 @@ func (d *DHCP) Allocate(args *skel.CmdArgs, result *current.Result) error {
hostNetns := d.hostNetnsPrefix + args.Netns
l, err = AcquireLease(clientID, hostNetns, args.IfName,
opts,
d.clientTimeout, d.clientResendMax, d.broadcast)
d.clientTimeout, d.clientResendMax, d.clientResendTimeout, d.broadcast)
if err != nil {
return err
}
@ -190,7 +192,8 @@ func getListener(socketPath string) (net.Listener, error) {
func runDaemon(
pidfilePath, hostPrefix, socketPath string,
dhcpClientTimeout time.Duration, resendMax time.Duration, broadcast bool,
dhcpClientTimeout time.Duration, resendMax time.Duration, resendTimeout time.Duration,
broadcast bool,
) error {
// since other goroutines (on separate threads) will change namespaces,
// ensure the RPC server does not get scheduled onto those
@ -225,7 +228,7 @@ func runDaemon(
done <- true
}()
dhcp := newDHCP(dhcpClientTimeout, resendMax)
dhcp := newDHCP(dhcpClientTimeout, resendMax, resendTimeout)
dhcp.hostNetnsPrefix = hostPrefix
dhcp.broadcast = broadcast
rpc.Register(dhcp)

View File

@ -66,7 +66,7 @@ var _ = Describe("DHCP Multiple Lease Operations", func() {
// Start the DHCP client daemon
dhcpPluginPath, err := exec.LookPath("dhcp")
Expect(err).NotTo(HaveOccurred())
clientCmd = exec.Command(dhcpPluginPath, "daemon", "-socketpath", socketPath)
clientCmd = exec.Command(dhcpPluginPath, "daemon", "-socketpath", socketPath, "--timeout", "2s", "--resendtimeout", "8s")
err = clientCmd.Start()
Expect(err).NotTo(HaveOccurred())
Expect(clientCmd.Process).NotTo(BeNil())

View File

@ -45,7 +45,8 @@ func getTmpDir() (string, error) {
}
type DhcpServer struct {
cmd *exec.Cmd
cmd *exec.Cmd
lock sync.Mutex
startAddr net.IP
endAddr net.IP
@ -53,6 +54,16 @@ type DhcpServer struct {
}
func (s *DhcpServer) Serve() error {
if err := s.Start(); err != nil {
return err
}
return s.cmd.Wait()
}
func (s *DhcpServer) Start() error {
s.lock.Lock()
defer s.lock.Unlock()
s.cmd = exec.Command(
"dnsmasq",
"--no-daemon",
@ -69,11 +80,9 @@ func (s *DhcpServer) Serve() error {
}
func (s *DhcpServer) Stop() error {
if err := s.cmd.Process.Kill(); err != nil {
return err
}
_, err := s.cmd.Process.Wait()
return err
s.lock.Lock()
defer s.lock.Unlock()
return s.cmd.Process.Kill()
}
func dhcpServerStart(netns ns.NetNS, numLeases int, stopCh <-chan bool) *sync.WaitGroup {
@ -535,7 +544,7 @@ var _ = Describe("DHCP Lease Unavailable Operations", func() {
// `go test` timeout with default delays. Since our DHCP server
// and client daemon are local processes anyway, we can depend on
// them to respond very quickly.
clientCmd = exec.Command(dhcpPluginPath, "daemon", "-socketpath", socketPath, "-timeout", "2s", "-resendmax", "8s")
clientCmd = exec.Command(dhcpPluginPath, "daemon", "-socketpath", socketPath, "-timeout", "2s", "-resendmax", "8s", "--resendtimeout", "10s")
// copy dhcp client's stdout/stderr to test stdout
var b bytes.Buffer

View File

@ -36,9 +36,10 @@ import (
// RFC 2131 suggests using exponential backoff, starting with 4sec
// and randomized to +/- 1sec
const (
resendDelay0 = 4 * time.Second
resendDelayMax = 62 * time.Second
defaultLeaseTime = 60 * time.Minute
resendDelay0 = 4 * time.Second
resendDelayMax = 62 * time.Second
defaultLeaseTime = 60 * time.Minute
defaultResendTimeout = 208 * time.Second // fast resend + backoff resend
)
// To speed up the retry for first few failures, we retry without
@ -69,6 +70,7 @@ type DHCPLease struct {
expireTime time.Time
timeout time.Duration
resendMax time.Duration
resendTimeout time.Duration
broadcast bool
stopping uint32
stop chan struct{}
@ -155,7 +157,7 @@ func prepareOptions(cniArgs string, provideOptions []ProvideOption, requestOptio
func AcquireLease(
clientID, netns, ifName string,
opts []dhcp4.Option,
timeout, resendMax time.Duration, broadcast bool,
timeout, resendMax time.Duration, resendTimeout time.Duration, broadcast bool,
) (*DHCPLease, error) {
errCh := make(chan error, 1)
@ -163,15 +165,16 @@ func AcquireLease(
ctx, cancel := context.WithCancel(ctx)
l := &DHCPLease{
clientID: clientID,
stop: make(chan struct{}),
check: make(chan struct{}),
timeout: timeout,
resendMax: resendMax,
broadcast: broadcast,
opts: opts,
cancelFunc: cancel,
ctx: ctx,
clientID: clientID,
stop: make(chan struct{}),
check: make(chan struct{}),
timeout: timeout,
resendMax: resendMax,
resendTimeout: resendTimeout,
broadcast: broadcast,
opts: opts,
cancelFunc: cancel,
ctx: ctx,
}
log.Printf("%v: acquiring lease", clientID)
@ -213,6 +216,7 @@ func AcquireLease(
func (l *DHCPLease) Stop() {
if atomic.CompareAndSwapUint32(&l.stopping, 0, 1) {
close(l.stop)
l.cancelFunc()
}
l.wg.Wait()
}
@ -251,9 +255,11 @@ func (l *DHCPLease) acquire() error {
}
defer c.Close()
pkt, err := backoffRetry(l.resendMax, func() (*nclient4.Lease, error) {
timeoutCtx, cancel := context.WithTimeoutCause(l.ctx, l.resendTimeout, errNoMoreTries)
defer cancel()
pkt, err := backoffRetry(timeoutCtx, l.resendMax, func() (*nclient4.Lease, error) {
return c.Request(
l.ctx,
timeoutCtx,
withClientID(l.clientID),
withAllOptions(l),
)
@ -351,9 +357,11 @@ func (l *DHCPLease) renew() error {
}
defer c.Close()
lease, err := backoffRetry(l.resendMax, func() (*nclient4.Lease, error) {
timeoutCtx, cancel := context.WithTimeoutCause(l.ctx, l.resendTimeout, errNoMoreTries)
defer cancel()
lease, err := backoffRetry(timeoutCtx, l.resendMax, func() (*nclient4.Lease, error) {
return c.Renew(
l.ctx,
timeoutCtx,
l.latestLease,
withClientID(l.clientID),
withAllOptions(l),
@ -441,7 +449,7 @@ func jitter(span time.Duration) time.Duration {
return time.Duration(float64(span) * (2.0*rand.Float64() - 1.0))
}
func backoffRetry(resendMax time.Duration, f func() (*nclient4.Lease, error)) (*nclient4.Lease, error) {
func backoffRetry(ctx context.Context, resendMax time.Duration, f func() (*nclient4.Lease, error)) (*nclient4.Lease, error) {
baseDelay := resendDelay0
var sleepTime time.Duration
fastRetryLimit := resendFastMax
@ -462,17 +470,16 @@ func backoffRetry(resendMax time.Duration, f func() (*nclient4.Lease, error)) (*
log.Printf("retrying in %f seconds", sleepTime.Seconds())
time.Sleep(sleepTime)
// only adjust delay time if we are in normal backoff stage
if baseDelay < resendMax && fastRetryLimit == 0 {
baseDelay *= 2
} else if fastRetryLimit == 0 { // only break if we are at normal delay
break
select {
case <-ctx.Done():
return nil, context.Cause(ctx)
case <-time.After(sleepTime):
// only adjust delay time if we are in normal backoff stage
if baseDelay < resendMax && fastRetryLimit == 0 {
baseDelay *= 2
}
}
}
return nil, errNoMoreTries
}
func newDHCPClient(

View File

@ -80,20 +80,22 @@ func main() {
var broadcast bool
var timeout time.Duration
var resendMax time.Duration
var resendTimeout time.Duration
daemonFlags := flag.NewFlagSet("daemon", flag.ExitOnError)
daemonFlags.StringVar(&pidfilePath, "pidfile", "", "optional path to write daemon PID to")
daemonFlags.StringVar(&hostPrefix, "hostprefix", "", "optional prefix to host root")
daemonFlags.StringVar(&socketPath, "socketpath", "", "optional dhcp server socketpath")
daemonFlags.BoolVar(&broadcast, "broadcast", false, "broadcast DHCP leases")
daemonFlags.DurationVar(&timeout, "timeout", 10*time.Second, "optional dhcp client timeout duration")
daemonFlags.DurationVar(&resendMax, "resendmax", resendDelayMax, "optional dhcp client resend max duration")
daemonFlags.DurationVar(&timeout, "timeout", 10*time.Second, "optional dhcp client timeout duration for each request")
daemonFlags.DurationVar(&resendMax, "resendmax", resendDelayMax, "optional dhcp client max resend delay between requests")
daemonFlags.DurationVar(&resendTimeout, "resendtimeout", defaultResendTimeout, "optional dhcp client resend timeout, no more retries after this timeout")
daemonFlags.Parse(os.Args[2:])
if socketPath == "" {
socketPath = defaultSocketPath
}
if err := runDaemon(pidfilePath, hostPrefix, socketPath, timeout, resendMax, broadcast); err != nil {
if err := runDaemon(pidfilePath, hostPrefix, socketPath, timeout, resendMax, resendTimeout, broadcast); err != nil {
log.Print(err.Error())
os.Exit(1)
}