add DHCP IPAM plugin

The plugin binary actually functions in two modes. The first mode
is a regular CNI plugin. The second mode (when stared with "daemon" arg)
runs a DHCP client daemon. When executed as a CNI plugin, it issues
an RPC request to the daemon for actual processing. The daemon is
required since a DHCP lease needs to be maintained by periodically
renewing it. One instance of the daemon can server arbitrary number
of containers/leases.
This commit is contained in:
Eugene Yakubovich
2015-05-19 12:02:41 -07:00
parent 7a8ee49891
commit c70320b5ed
15 changed files with 791 additions and 838 deletions

157
plugins/ipam/dhcp/daemon.go Normal file
View File

@ -0,0 +1,157 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"encoding/json"
"errors"
"fmt"
"log"
"net"
"net/http"
"net/rpc"
"os"
"path/filepath"
"runtime"
"sync"
"github.com/appc/cni/Godeps/_workspace/src/github.com/coreos/go-systemd/activation"
"github.com/appc/cni/pkg/plugin"
"github.com/appc/cni/pkg/skel"
)
const listenFdsStart = 3
const resendCount = 3
var errNoMoreTries = errors.New("no more tries")
type DHCP struct {
mux sync.Mutex
leases map[string]*DHCPLease
}
func newDHCP() *DHCP {
return &DHCP{
leases: make(map[string]*DHCPLease),
}
}
// Allocate acquires an IP from a DHCP server for a specified container.
// The acquired lease will be maintained until Release() is called.
func (d *DHCP) Allocate(args *skel.CmdArgs, result *plugin.Result) error {
conf := plugin.NetConf{}
if err := json.Unmarshal(args.StdinData, &conf); err != nil {
return fmt.Errorf("error parsing netconf: %v", err)
}
clientID := args.ContainerID + "/" + conf.Name
l, err := AcquireLease(clientID, args.Netns, args.IfName)
if err != nil {
return err
}
ipn, err := l.IPNet()
if err != nil {
l.Stop()
return err
}
d.setLease(args.ContainerID, conf.Name, l)
result.IP4 = &plugin.IPConfig{
IP: *ipn,
Gateway: l.Gateway(),
Routes: l.Routes(),
}
return nil
}
// Release stops maintenance of the lease acquired in Allocate()
// and sends a release msg to the DHCP server.
func (d *DHCP) Release(args *skel.CmdArgs, reply *struct{}) error {
conf := plugin.NetConf{}
if err := json.Unmarshal(args.StdinData, &conf); err != nil {
return fmt.Errorf("error parsing netconf: %v", err)
}
if l := d.getLease(args.ContainerID, conf.Name); l != nil {
l.Stop()
return nil
}
return fmt.Errorf("lease not found: %v/%v", args.ContainerID, conf.Name)
}
func (d *DHCP) getLease(contID, netName string) *DHCPLease {
d.mux.Lock()
defer d.mux.Unlock()
// TODO(eyakubovich): hash it to avoid collisions
l, ok := d.leases[contID+netName]
if !ok {
return nil
}
return l
}
func (d *DHCP) setLease(contID, netName string, l *DHCPLease) {
d.mux.Lock()
defer d.mux.Unlock()
// TODO(eyakubovich): hash it to avoid collisions
d.leases[contID+netName] = l
}
func getListener() (net.Listener, error) {
l, err := activation.Listeners(true)
if err != nil {
return nil, err
}
switch {
case len(l) == 0:
if err := os.MkdirAll(filepath.Dir(socketPath), 0700); err != nil {
return nil, err
}
return net.Listen("unix", socketPath)
case len(l) == 1:
if l[0] == nil {
return nil, fmt.Errorf("LISTEN_FDS=1 but no FD found")
}
return l[0], nil
default:
return nil, fmt.Errorf("Too many (%v) FDs passed through socket activation", len(l))
}
}
func runDaemon() {
// since other goroutines (on separate threads) will change namespaces,
// ensure the RPC server does not get scheduled onto those
runtime.LockOSThread()
l, err := getListener()
if err != nil {
log.Printf("Error getting listener: %v", err)
return
}
dhcp := newDHCP()
rpc.Register(dhcp)
rpc.HandleHTTP()
http.Serve(l, nil)
}

329
plugins/ipam/dhcp/lease.go Normal file
View File

@ -0,0 +1,329 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"fmt"
"log"
"math/rand"
"net"
"os"
"sync"
"time"
"github.com/appc/cni/Godeps/_workspace/src/github.com/d2g/dhcp4"
"github.com/appc/cni/Godeps/_workspace/src/github.com/d2g/dhcp4client"
"github.com/appc/cni/Godeps/_workspace/src/github.com/vishvananda/netlink"
"github.com/appc/cni/pkg/ns"
"github.com/appc/cni/pkg/plugin"
)
// RFC 2131 suggests using exponential backoff, starting with 4sec
// and randomized to +/- 1sec
const resendDelay0 = 4 * time.Second
const (
leaseStateBound = iota
leaseStateRenewing
leaseStateRebinding
)
// This implementation uses 1 OS thread per lease. This is because
// all the network operations have to be done in network namespace
// of the interface. This can be improved by switching to the proper
// namespace for network ops and using fewer threads. However, this
// needs to be done carefully as dhcp4client ops are blocking.
type DHCPLease struct {
clientID string
ack *dhcp4.Packet
opts dhcp4.Options
link netlink.Link
renewalTime time.Time
rebindingTime time.Time
expireTime time.Time
stop chan struct{}
wg sync.WaitGroup
}
// AcquireLease gets an DHCP lease and then maintains it in the background
// by periodically renewing it. The acquired lease can be released by
// calling DHCPLease.Stop()
func AcquireLease(clientID, netns, ifName string) (*DHCPLease, error) {
errCh := make(chan error, 1)
l := &DHCPLease{
clientID: clientID,
stop: make(chan struct{}),
}
log.Printf("%v: acquiring lease", clientID)
l.wg.Add(1)
go ns.WithNetNSPath(netns, true, func(_ *os.File) (e error) {
defer l.wg.Done()
link, err := netlink.LinkByName(ifName)
if err != nil {
errCh <- fmt.Errorf("error looking up %q", ifName)
return
}
l.link = link
if err = l.acquire(); err != nil {
errCh <- err
return
}
log.Printf("%v: lease acquired, expiration is %v", l.clientID, l.expireTime)
errCh <- nil
l.maintain()
return
})
err := <-errCh
if err != nil {
return nil, err
}
return l, nil
}
// Stop terminates the background task that maintains the lease
// and issues a DHCP Release
func (l *DHCPLease) Stop() {
close(l.stop)
l.wg.Wait()
}
func (l *DHCPLease) acquire() error {
c, err := newDHCPClient(l.link)
if err != nil {
return err
}
defer c.Close()
pkt, err := backoffRetry(func() (*dhcp4.Packet, error) {
ok, ack, err := c.Request()
switch {
case err != nil:
return nil, err
case !ok:
return nil, fmt.Errorf("DHCP server NACK'd own offer")
default:
return &ack, nil
}
})
if err != nil {
return err
}
return l.commit(pkt)
}
func (l *DHCPLease) commit(ack *dhcp4.Packet) error {
opts := ack.ParseOptions()
leaseTime, err := parseLeaseTime(opts)
if err != nil {
return err
}
rebindingTime, err := parseRebindingTime(opts)
if err != nil || rebindingTime > leaseTime {
// Per RFC 2131 Section 4.4.5, it should default to 85% of lease time
rebindingTime = leaseTime * 85 / 100
}
renewalTime, err := parseRenewalTime(opts)
if err != nil || renewalTime > rebindingTime {
// Per RFC 2131 Section 4.4.5, it should default to 50% of lease time
renewalTime = leaseTime / 2
}
now := time.Now()
l.expireTime = now.Add(leaseTime)
l.renewalTime = now.Add(renewalTime)
l.rebindingTime = now.Add(rebindingTime)
l.ack = ack
l.opts = opts
return nil
}
func (l *DHCPLease) maintain() {
state := leaseStateBound
for {
var sleepDur time.Duration
switch state {
case leaseStateBound:
sleepDur = l.renewalTime.Sub(time.Now())
if sleepDur <= 0 {
log.Printf("%v: renewing lease", l.clientID)
state = leaseStateRenewing
continue
}
case leaseStateRenewing:
if err := l.renew(); err != nil {
log.Printf("%v: %v", l.clientID, err)
if time.Now().After(l.rebindingTime) {
log.Printf("%v: renawal time expired, rebinding", l.clientID)
state = leaseStateRebinding
}
} else {
log.Printf("%v: lease renewed, expiration is %v", l.clientID, l.expireTime)
state = leaseStateBound
}
case leaseStateRebinding:
if err := l.acquire(); err != nil {
log.Printf("%v: %v", l.clientID, err)
if time.Now().After(l.expireTime) {
log.Printf("%v: lease expired, bringing interface DOWN", l.clientID)
l.downIface()
return
}
} else {
log.Printf("%v: lease rebound, expiration is %v", l.clientID, l.expireTime)
state = leaseStateBound
}
}
select {
case <-time.After(sleepDur):
case <-l.stop:
if err := l.release(); err != nil {
log.Printf("%v: failed to release DHCP lease: %v", l.clientID, err)
}
return
}
}
}
func (l *DHCPLease) downIface() {
if err := netlink.LinkSetDown(l.link); err != nil {
log.Printf("%v: failed to bring %v interface DOWN: %v", l.clientID, l.link.Attrs().Name, err)
}
}
func (l *DHCPLease) renew() error {
c, err := newDHCPClient(l.link)
if err != nil {
return err
}
defer c.Close()
pkt, err := backoffRetry(func() (*dhcp4.Packet, error) {
ok, ack, err := c.Renew(*l.ack)
switch {
case err != nil:
return nil, err
case !ok:
return nil, fmt.Errorf("DHCP server did not renew lease")
default:
return &ack, nil
}
})
if err != nil {
return err
}
l.commit(pkt)
return nil
}
func (l *DHCPLease) release() error {
log.Printf("%v: releasing lease", l.clientID)
c, err := newDHCPClient(l.link)
if err != nil {
return err
}
defer c.Close()
if err = c.Release(*l.ack); err != nil {
return fmt.Errorf("failed to send DHCPRELEASE")
}
return nil
}
func (l *DHCPLease) IPNet() (*net.IPNet, error) {
mask := parseSubnetMask(l.opts)
if mask == nil {
return nil, fmt.Errorf("DHCP option Subnet Mask not found in DHCPACK")
}
return &net.IPNet{
IP: l.ack.YIAddr(),
Mask: mask,
}, nil
}
func (l *DHCPLease) Gateway() net.IP {
return parseRouter(l.opts)
}
func (l *DHCPLease) Routes() []plugin.Route {
routes := parseRoutes(l.opts)
return append(routes, parseCIDRRoutes(l.opts)...)
}
// jitter returns a random value within [-span, span) range
func jitter(span time.Duration) time.Duration {
return time.Duration(float64(span) * (2.0*rand.Float64() - 1.0))
}
func backoffRetry(f func() (*dhcp4.Packet, error)) (*dhcp4.Packet, error) {
var baseDelay time.Duration = resendDelay0
for i := 0; i < resendCount; i++ {
pkt, err := f()
if err == nil {
return pkt, nil
}
log.Print(err)
time.Sleep(baseDelay + jitter(time.Second))
baseDelay *= 2
}
return nil, errNoMoreTries
}
func newDHCPClient(link netlink.Link) (*dhcp4client.Client, error) {
pktsock, err := dhcp4client.NewPacketSock(link.Attrs().Index)
if err != nil {
return nil, err
}
return dhcp4client.New(
dhcp4client.HardwareAddr(link.Attrs().HardwareAddr),
dhcp4client.Timeout(10*time.Second),
dhcp4client.Broadcast(false),
dhcp4client.Connection(pktsock),
)
}

64
plugins/ipam/dhcp/main.go Normal file
View File

@ -0,0 +1,64 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"fmt"
"net/rpc"
"os"
"github.com/appc/cni/pkg/plugin"
"github.com/appc/cni/pkg/skel"
)
const socketPath = "/run/cni/dhcp.sock"
func main() {
if len(os.Args) > 1 && os.Args[1] == "daemon" {
runDaemon()
} else {
skel.PluginMain(cmdAdd, cmdDel)
}
}
func cmdAdd(args *skel.CmdArgs) error {
client, err := rpc.DialHTTP("unix", socketPath)
if err != nil {
return fmt.Errorf("error dialing DHCP daemon: %v", err)
}
result := &plugin.Result{}
err = client.Call("DHCP.Allocate", args, result)
if err != nil {
return fmt.Errorf("error calling DHCP.Add: %v", err)
}
return plugin.PrintResult(result)
}
func cmdDel(args *skel.CmdArgs) error {
client, err := rpc.DialHTTP("unix", socketPath)
if err != nil {
return fmt.Errorf("error dialing DHCP daemon: %v", err)
}
dummy := struct{}{}
err = client.Call("DHCP.Release", args, &dummy)
if err != nil {
return fmt.Errorf("error calling DHCP.Del: %v", err)
}
return nil
}

View File

@ -0,0 +1,139 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"encoding/binary"
"fmt"
"net"
"time"
"github.com/appc/cni/Godeps/_workspace/src/github.com/d2g/dhcp4"
"github.com/appc/cni/pkg/plugin"
)
func parseRouter(opts dhcp4.Options) net.IP {
if opts, ok := opts[dhcp4.OptionRouter]; ok {
if len(opts) == 4 {
return net.IP(opts)
}
}
return nil
}
func classfulSubnet(sn net.IP) net.IPNet {
return net.IPNet{
IP: sn,
Mask: sn.DefaultMask(),
}
}
func parseRoutes(opts dhcp4.Options) []plugin.Route {
// StaticRoutes format: pairs of:
// Dest = 4 bytes; Classful IP subnet
// Router = 4 bytes; IP address of router
routes := []plugin.Route{}
if opt, ok := opts[dhcp4.OptionStaticRoute]; ok {
for len(opt) >= 8 {
sn := opt[0:4]
r := opt[4:8]
rt := plugin.Route{
Dst: classfulSubnet(sn),
GW: r,
}
routes = append(routes, rt)
opt = opt[8:]
}
}
return routes
}
func parseCIDRRoutes(opts dhcp4.Options) []plugin.Route {
// See RFC4332 for format (http://tools.ietf.org/html/rfc3442)
routes := []plugin.Route{}
if opt, ok := opts[dhcp4.OptionClasslessRouteFormat]; ok {
for len(opt) >= 5 {
width := int(opt[0])
if width > 32 {
// error: can't have more than /32
return nil
}
// network bits are compacted to avoid zeros
octets := 0
if width > 0 {
octets = (width-1)/8 + 1
}
if len(opt) < 1+octets+4 {
// error: too short
return nil
}
sn := make([]byte, 4)
copy(sn, opt[1:octets+1])
gw := net.IP(opt[octets+1 : octets+5])
rt := plugin.Route{
Dst: net.IPNet{
IP: net.IP(sn),
Mask: net.CIDRMask(width, 32),
},
GW: gw,
}
routes = append(routes, rt)
opt = opt[octets+5 : len(opt)]
}
}
return routes
}
func parseSubnetMask(opts dhcp4.Options) net.IPMask {
mask, ok := opts[dhcp4.OptionSubnetMask]
if !ok {
return nil
}
return net.IPMask(mask)
}
func parseDuration(opts dhcp4.Options, code dhcp4.OptionCode, optName string) (time.Duration, error) {
val, ok := opts[code]
if !ok {
return 0, fmt.Errorf("option %v not found", optName)
}
if len(val) != 4 {
return 0, fmt.Errorf("option %v is not 4 bytes", optName)
}
secs := binary.BigEndian.Uint32(val)
return time.Duration(secs) * time.Second, nil
}
func parseLeaseTime(opts dhcp4.Options) (time.Duration, error) {
return parseDuration(opts, dhcp4.OptionIPAddressLeaseTime, "LeaseTime")
}
func parseRenewalTime(opts dhcp4.Options) (time.Duration, error) {
return parseDuration(opts, dhcp4.OptionRenewalTimeValue, "RenewalTime")
}
func parseRebindingTime(opts dhcp4.Options) (time.Duration, error) {
return parseDuration(opts, dhcp4.OptionRebindingTimeValue, "RebindingTime")
}

View File

@ -0,0 +1,75 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"net"
"testing"
"github.com/appc/cni/Godeps/_workspace/src/github.com/d2g/dhcp4"
"github.com/appc/cni/pkg/plugin"
)
func validateRoutes(t *testing.T, routes []plugin.Route) {
expected := []plugin.Route{
plugin.Route{
Dst: net.IPNet{
IP: net.IPv4(10, 0, 0, 0),
Mask: net.CIDRMask(8, 32),
},
GW: net.IPv4(10, 1, 2, 3),
},
plugin.Route{
Dst: net.IPNet{
IP: net.IPv4(192, 168, 1, 0),
Mask: net.CIDRMask(24, 32),
},
GW: net.IPv4(192, 168, 2, 3),
},
}
if len(routes) != len(expected) {
t.Fatalf("wrong length slice; expected %v, got %v", len(expected), len(routes))
}
for i := 0; i < len(routes); i++ {
a := routes[i]
e := expected[i]
if a.Dst.String() != e.Dst.String() {
t.Errorf("route.Dst mismatch: expected %v, got %v", e.Dst, a.Dst)
}
if !a.GW.Equal(e.GW) {
t.Errorf("route.GW mismatch: expected %v, got %v", e.GW, a.GW)
}
}
}
func TestParseRoutes(t *testing.T) {
opts := make(dhcp4.Options)
opts[dhcp4.OptionStaticRoute] = []byte{10, 0, 0, 0, 10, 1, 2, 3, 192, 168, 1, 0, 192, 168, 2, 3}
routes := parseRoutes(opts)
validateRoutes(t, routes)
}
func TestParseCIDRRoutes(t *testing.T) {
opts := make(dhcp4.Options)
opts[dhcp4.OptionClasslessRouteFormat] = []byte{8, 10, 10, 1, 2, 3, 24, 192, 168, 1, 192, 168, 2, 3}
routes := parseCIDRRoutes(opts)
validateRoutes(t, routes)
}