plugins/meta/bandwith: traffic shaping plugin

Add chained plugin to add a tbf qdisc to shape ingress/egress traffic
This commit is contained in:
DennisDenuto 2018-01-07 19:15:17 -08:00 committed by Gabriel Rosenhouse
parent 372bb5e826
commit b78e535055
8 changed files with 1236 additions and 5 deletions

View File

@ -68,7 +68,7 @@ var _ = Describe("Echosvr", func() {
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
defer conn.Close() defer conn.Close()
fmt.Fprintf(conn, "hello") fmt.Fprintf(conn, "hello\n")
Expect(ioutil.ReadAll(conn)).To(Equal([]byte("hello"))) Expect(ioutil.ReadAll(conn)).To(Equal([]byte("hello")))
}) })
}) })

View File

@ -7,8 +7,13 @@
package main package main
import ( import (
"bufio"
"fmt" "fmt"
"io"
"net" "net"
"os"
"strings"
"time"
) )
func main() { func main() {
@ -31,8 +36,22 @@ func main() {
} }
func handleConnection(conn net.Conn) { func handleConnection(conn net.Conn) {
buf := make([]byte, 512) conn.SetReadDeadline(time.Now().Add(1 * time.Minute))
nBytesRead, _ := conn.Read(buf) content, err := bufio.NewReader(conn).ReadString('\n')
conn.Write(buf[0:nBytesRead]) if err != nil && err != io.EOF {
conn.Close() fmt.Fprint(os.Stderr, err.Error())
return
}
conn.SetWriteDeadline(time.Now().Add(1 * time.Minute))
if _, err = conn.Write([]byte(strings.TrimSuffix(content, "\n"))); err != nil {
fmt.Fprint(os.Stderr, err.Error())
return
}
if err = conn.Close(); err != nil {
fmt.Fprint(os.Stderr, err.Error())
return
}
} }

View File

@ -8,3 +8,4 @@ plugins/main/ptp
plugins/main/vlan plugins/main/vlan
plugins/meta/portmap plugins/meta/portmap
plugins/meta/tuning plugins/meta/tuning
plugins/meta/bandwidth

View File

@ -0,0 +1,64 @@
# bandwidth plugin
## Overview
This plugin provides a way to use and configure Linux's Traffic control (tc) subystem. tc encompasses the sets of mechanisms and operations by which packets are queued for transmission/reception on a network interface.
This plugin configures a token bucket filter (tbf) queuing discipline (qdisc) on both ingress and egress traffic. Resulting in traffic being shaped when reading / writing.
Due to limitations on tc shaping rules for ingress, this plugin creates an Intermediate Functional Block device (ifb) to redirect packets from the host interface. tc tbf is then applied to the ifb device. The packets that were redirected to the ifb devices, are written OUT (and shaped) to the host interface.
This plugin is only useful when used in addition to other plugins.
## Chaining
The bandwidth plugin applies traffic shaping to interfaces (as described above) created by previously applied plugins.
The following is an example [json configuration list](https://github.com/containernetworking/cni/blob/master/SPEC.md#network-configuration-list-runtime-examples) for creating a `ptp` between the host -> container via veth interfaces, whereby traffic is shaped by the `bandwidth` plugin:
```json
{
"cniVersion": "0.3.1",
"name": "mynet",
"plugins": [
{
"type": "ptp",
"ipMasq": true,
"mtu": 512,
"ipam": {
"type": "host-local",
"subnet": "10.0.0.0/24"
},
"dns": {
"nameservers": [ "10.1.0.1" ]
}
},
{
"name": "slowdown",
"type": "bandwidth",
"ingressRate": 123,
"ingressBurst": 456,
"egressRate": 123,
"egressBurst": 456
}
]
}
```
The result is an `ifb` device in the host namespace redirecting to the `host-interface`, with `tc tbf` applied on the `ifb` device and the `container-interface`
## Network configuration reference
* ingressRate: is the rate in Kbps at which traffic can enter an interface. (See http://man7.org/linux/man-pages/man8/tbf.8.html)
* ingressBurst: is the maximum amount in Kb that tokens can be made available for instantaneously. (See http://man7.org/linux/man-pages/man8/tbf.8.html)
* egressRate: is the rate in Kbps at which traffic can leave an interface. (See http://man7.org/linux/man-pages/man8/tbf.8.html)
* egressBurst: is the maximum amount in Kb that tokens can be made available for instantaneously. (See http://man7.org/linux/man-pages/man8/tbf.8.html)
Both ingressRate and ingressBurst must be set in order to limit ingress bandwidth. If neither one is set, then ingress bandwidth is not limited.
Both egressRate and egressBurst must be set in order to limit egress bandwidth. If neither one is set, then egress bandwidth is not limited.
## tc tbf documentation
- [tldp traffic control](http://tldp.org/HOWTO/Traffic-Control-HOWTO/components.html)
- [man tbf](http://man7.org/linux/man-pages/man8/tbf.8.html)
- [tc ingress and ifb mirroring](https://serverfault.com/questions/350023/tc-ingress-policing-and-ifb-mirroring)

View File

@ -0,0 +1,565 @@
// Copyright 2018 CNI authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package bandwidth
import (
"fmt"
"encoding/json"
"github.com/containernetworking/cni/pkg/invoke"
"github.com/containernetworking/cni/pkg/skel"
"github.com/containernetworking/cni/pkg/types"
"github.com/containernetworking/cni/pkg/types/current"
"github.com/containernetworking/plugins/pkg/ns"
"github.com/containernetworking/plugins/pkg/testutils"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/onsi/gomega/gexec"
"github.com/vishvananda/netlink"
"net"
"time"
)
var _ = Describe("bandwidth test", func() {
var (
hostNs ns.NetNS
containerNs ns.NetNS
ifbDeviceName string
hostIfname string
containerIfname string
hostIP net.IP
containerIP net.IP
hostIfaceMTU int
)
BeforeEach(func() {
var err error
hostIfname = "host-veth"
containerIfname = "container-veth"
hostNs, err = ns.NewNS()
Expect(err).NotTo(HaveOccurred())
containerNs, err = ns.NewNS()
Expect(err).NotTo(HaveOccurred())
hostIP = net.IP{169, 254, 0, 1}
containerIP = net.IP{10, 254, 0, 1}
hostIfaceMTU = 1024
ifbDeviceName = "5b6c"
createVeth(hostNs.Path(), hostIfname, containerNs.Path(), containerIfname, hostIP, containerIP, hostIfaceMTU)
})
AfterEach(func() {
containerNs.Close()
hostNs.Close()
})
Describe("cmdADD", func() {
It("Works with a Veth pair using 0.3.0 config", func() {
conf := `{
"cniVersion": "0.3.0",
"name": "cni-plugin-bandwidth-test",
"type": "bandwidth",
"ingressRate": 8,
"ingressBurst": 8,
"egressRate": 16,
"egressBurst": 9,
"prevResult": {
"interfaces": [
{
"name": "%s",
"sandbox": ""
},
{
"name": "%s",
"sandbox": "%s"
}
],
"ips": [
{
"version": "4",
"address": "%s/24",
"gateway": "10.0.0.1",
"interface": 1
}
],
"routes": []
}
}`
conf = fmt.Sprintf(conf, hostIfname, containerIfname, containerNs.Path(), containerIP.String())
args := &skel.CmdArgs{
ContainerID: "dummy",
Netns: containerNs.Path(),
IfName: containerIfname,
StdinData: []byte(conf),
}
Expect(hostNs.Do(func(netNS ns.NetNS) error {
defer GinkgoRecover()
r, out, err := testutils.CmdAddWithResult(containerNs.Path(), "", []byte(conf), func() error { return cmdAdd(args) })
Expect(err).NotTo(HaveOccurred(), string(out))
result, err := current.GetResult(r)
Expect(err).NotTo(HaveOccurred())
Expect(result.Interfaces).To(HaveLen(3))
Expect(result.Interfaces[2].Name).To(Equal(ifbDeviceName))
Expect(result.Interfaces[2].Sandbox).To(Equal(""))
ifbLink, err := netlink.LinkByName(ifbDeviceName)
Expect(err).NotTo(HaveOccurred())
Expect(ifbLink.Attrs().MTU).To(Equal(hostIfaceMTU))
qdiscs, err := netlink.QdiscList(ifbLink)
Expect(err).NotTo(HaveOccurred())
Expect(qdiscs).To(HaveLen(1))
Expect(qdiscs[0].Attrs().LinkIndex).To(Equal(ifbLink.Attrs().Index))
Expect(qdiscs[0]).To(BeAssignableToTypeOf(&netlink.Tbf{}))
Expect(qdiscs[0].(*netlink.Tbf).Rate).To(Equal(uint64(2)))
Expect(qdiscs[0].(*netlink.Tbf).Limit).To(Equal(uint32(9)))
hostVethLink, err := netlink.LinkByName(hostIfname)
Expect(err).NotTo(HaveOccurred())
qdiscFilters, err := netlink.FilterList(hostVethLink, netlink.MakeHandle(0xffff, 0))
Expect(err).NotTo(HaveOccurred())
Expect(qdiscFilters).To(HaveLen(1))
Expect(qdiscFilters[0].(*netlink.U32).Actions[0].(*netlink.MirredAction).Ifindex).To(Equal(ifbLink.Attrs().Index))
return nil
})).To(Succeed())
Expect(hostNs.Do(func(n ns.NetNS) error {
defer GinkgoRecover()
ifbLink, err := netlink.LinkByName(hostIfname)
Expect(err).NotTo(HaveOccurred())
qdiscs, err := netlink.QdiscList(ifbLink)
Expect(err).NotTo(HaveOccurred())
Expect(qdiscs).To(HaveLen(2))
Expect(qdiscs[0].Attrs().LinkIndex).To(Equal(ifbLink.Attrs().Index))
Expect(qdiscs[0]).To(BeAssignableToTypeOf(&netlink.Tbf{}))
Expect(qdiscs[0].(*netlink.Tbf).Rate).To(Equal(uint64(1)))
Expect(qdiscs[0].(*netlink.Tbf).Limit).To(Equal(uint32(8)))
return nil
})).To(Succeed())
})
It("Does not apply ingress when disabled", func() {
conf := `{
"cniVersion": "0.3.0",
"name": "cni-plugin-bandwidth-test",
"type": "bandwidth",
"ingressRate": 0,
"ingressBurst": 0,
"egressRate": 8,
"egressBurst": 1,
"prevResult": {
"interfaces": [
{
"name": "%s",
"sandbox": ""
},
{
"name": "%s",
"sandbox": "%s"
}
],
"ips": [
{
"version": "4",
"address": "%s/24",
"gateway": "10.0.0.1",
"interface": 1
}
],
"routes": []
}
}`
conf = fmt.Sprintf(conf, hostIfname, containerIfname, containerNs.Path(), containerIP.String())
args := &skel.CmdArgs{
ContainerID: "dummy",
Netns: containerNs.Path(),
IfName: containerIfname,
StdinData: []byte(conf),
}
Expect(hostNs.Do(func(netNS ns.NetNS) error {
defer GinkgoRecover()
_, out, err := testutils.CmdAddWithResult(containerNs.Path(), ifbDeviceName, []byte(conf), func() error { return cmdAdd(args) })
Expect(err).NotTo(HaveOccurred(), string(out))
_, err = netlink.LinkByName(ifbDeviceName)
Expect(err).NotTo(HaveOccurred())
return nil
})).To(Succeed())
Expect(hostNs.Do(func(n ns.NetNS) error {
defer GinkgoRecover()
containerIfLink, err := netlink.LinkByName(hostIfname)
Expect(err).NotTo(HaveOccurred())
qdiscs, err := netlink.QdiscList(containerIfLink)
Expect(err).NotTo(HaveOccurred())
Expect(qdiscs).To(HaveLen(2))
Expect(qdiscs[0]).NotTo(BeAssignableToTypeOf(&netlink.Tbf{}))
Expect(qdiscs[1]).NotTo(BeAssignableToTypeOf(&netlink.Tbf{}))
return nil
})).To(Succeed())
})
It("Does not apply egress when disabled", func() {
conf := `{
"cniVersion": "0.3.0",
"name": "cni-plugin-bandwidth-test",
"type": "bandwidth",
"egressRate": 0,
"egressBurst": 0,
"ingressRate": 8,
"ingressBurst": 1,
"prevResult": {
"interfaces": [
{
"name": "%s",
"sandbox": ""
},
{
"name": "%s",
"sandbox": "%s"
}
],
"ips": [
{
"version": "4",
"address": "%s/24",
"gateway": "10.0.0.1",
"interface": 1
}
],
"routes": []
}
}`
conf = fmt.Sprintf(conf, hostIfname, containerIfname, containerNs.Path(), containerIP.String())
args := &skel.CmdArgs{
ContainerID: "dummy",
Netns: containerNs.Path(),
IfName: containerIfname,
StdinData: []byte(conf),
}
Expect(hostNs.Do(func(netNS ns.NetNS) error {
defer GinkgoRecover()
_, out, err := testutils.CmdAddWithResult(containerNs.Path(), ifbDeviceName, []byte(conf), func() error { return cmdAdd(args) })
Expect(err).NotTo(HaveOccurred(), string(out))
_, err = netlink.LinkByName(ifbDeviceName)
Expect(err).To(HaveOccurred())
return nil
})).To(Succeed())
Expect(hostNs.Do(func(n ns.NetNS) error {
defer GinkgoRecover()
containerIfLink, err := netlink.LinkByName(hostIfname)
Expect(err).NotTo(HaveOccurred())
qdiscs, err := netlink.QdiscList(containerIfLink)
Expect(err).NotTo(HaveOccurred())
Expect(qdiscs).To(HaveLen(1))
Expect(qdiscs[0].Attrs().LinkIndex).To(Equal(containerIfLink.Attrs().Index))
Expect(qdiscs[0]).To(BeAssignableToTypeOf(&netlink.Tbf{}))
Expect(qdiscs[0].(*netlink.Tbf).Rate).To(Equal(uint64(1)))
Expect(qdiscs[0].(*netlink.Tbf).Limit).To(Equal(uint32(1)))
return nil
})).To(Succeed())
})
It("fails an invalid ingress config", func() {
conf := `{
"cniVersion": "0.3.0",
"name": "cni-plugin-bandwidth-test",
"type": "bandwidth",
"ingressRate": 0,
"ingressBurst": 123,
"egressRate": 123,
"egressBurst": 123,
"prevResult": {
"interfaces": [
{
"name": "%s",
"sandbox": ""
},
{
"name": "%s",
"sandbox": "%s"
}
],
"ips": [
{
"version": "4",
"address": "%s/24",
"gateway": "10.0.0.1",
"interface": 1
}
],
"routes": []
}
}`
conf = fmt.Sprintf(conf, hostIfname, containerIfname, containerNs.Path(), containerIP.String())
args := &skel.CmdArgs{
ContainerID: "dummy",
Netns: containerNs.Path(),
IfName: "eth0",
StdinData: []byte(conf),
}
Expect(hostNs.Do(func(netNS ns.NetNS) error {
defer GinkgoRecover()
_, _, err := testutils.CmdAddWithResult(containerNs.Path(), "", []byte(conf), func() error { return cmdAdd(args) })
Expect(err).To(MatchError("if burst is set, rate must also be set"))
return nil
})).To(Succeed())
})
})
Describe("cmdDEL", func() {
It("Works with a Veth pair using 0.3.0 config", func() {
conf := `{
"cniVersion": "0.3.0",
"name": "cni-plugin-bandwidth-test",
"type": "bandwidth",
"ingressRate": 8,
"ingressBurst": 8,
"egressRate": 9,
"egressBurst": 9,
"prevResult": {
"interfaces": [
{
"name": "%s",
"sandbox": ""
},
{
"name": "%s",
"sandbox": "%s"
}
],
"ips": [
{
"version": "4",
"address": "%s/24",
"gateway": "10.0.0.1",
"interface": 1
}
],
"routes": []
}
}`
conf = fmt.Sprintf(conf, hostIfname, containerIfname, containerNs.Path(), containerIP.String())
args := &skel.CmdArgs{
ContainerID: "dummy",
Netns: containerNs.Path(),
IfName: containerIfname,
StdinData: []byte(conf),
}
Expect(hostNs.Do(func(netNS ns.NetNS) error {
defer GinkgoRecover()
_, out, err := testutils.CmdAddWithResult(containerNs.Path(), "", []byte(conf), func() error { return cmdAdd(args) })
Expect(err).NotTo(HaveOccurred(), string(out))
err = testutils.CmdDelWithResult(containerNs.Path(), "", func() error { return cmdDel(args) })
Expect(err).NotTo(HaveOccurred(), string(out))
_, err = netlink.LinkByName(ifbDeviceName)
Expect(err).To(HaveOccurred())
return nil
})).To(Succeed())
})
})
Context("when chaining bandwidth plugin with PTP using 0.3.0 config", func() {
var ptpConf string
var rateInBits int
var burstInBits int
var packetInBytes int
var containerWithoutTbfNS ns.NetNS
var containerWithTbfNS ns.NetNS
var portServerWithTbf int
var portServerWithoutTbf int
var containerWithTbfRes types.Result
var containerWithoutTbfRes types.Result
var echoServerWithTbf *gexec.Session
var echoServerWithoutTbf *gexec.Session
BeforeEach(func() {
rateInBytes := 1000
rateInBits = rateInBytes * 8
burstInBits = rateInBits * 2
packetInBytes = rateInBytes * 25
ptpConf = `{
"cniVersion": "0.3.0",
"name": "mynet",
"type": "ptp",
"ipMasq": true,
"mtu": 512,
"ipam": {
"type": "host-local",
"subnet": "10.1.2.0/24"
}
}`
containerWithTbfIFName := "ptp0"
containerWithoutTbfIFName := "ptp1"
var err error
containerWithTbfNS, err = ns.NewNS()
Expect(err).NotTo(HaveOccurred())
containerWithoutTbfNS, err = ns.NewNS()
Expect(err).NotTo(HaveOccurred())
By("create two containers, and use the bandwidth plugin on one of them")
Expect(hostNs.Do(func(ns.NetNS) error {
defer GinkgoRecover()
containerWithTbfRes, _, err = testutils.CmdAddWithResult(containerWithTbfNS.Path(), containerWithTbfIFName, []byte(ptpConf), func() error {
r, err := invoke.DelegateAdd("ptp", []byte(ptpConf))
Expect(r.Print()).To(Succeed())
return err
})
Expect(err).NotTo(HaveOccurred())
containerWithoutTbfRes, _, err = testutils.CmdAddWithResult(containerWithoutTbfNS.Path(), containerWithoutTbfIFName, []byte(ptpConf), func() error {
r, err := invoke.DelegateAdd("ptp", []byte(ptpConf))
Expect(r.Print()).To(Succeed())
return err
})
Expect(err).NotTo(HaveOccurred())
containerWithTbfResult, err := current.GetResult(containerWithTbfRes)
Expect(err).NotTo(HaveOccurred())
tbfPluginConf := PluginConf{
IngressBurst: burstInBits,
IngressRate: rateInBits,
EgressBurst: burstInBits,
EgressRate: rateInBits,
}
tbfPluginConf.Name = "mynet"
tbfPluginConf.CNIVersion = "0.3.0"
tbfPluginConf.Type = "bandwidth"
tbfPluginConf.RawPrevResult = &map[string]interface{}{
"ips": containerWithTbfResult.IPs,
"interfaces": containerWithTbfResult.Interfaces,
}
tbfPluginConf.PrevResult = &current.Result{
IPs: containerWithTbfResult.IPs,
Interfaces: containerWithTbfResult.Interfaces,
}
conf, err := json.Marshal(tbfPluginConf)
Expect(err).NotTo(HaveOccurred())
args := &skel.CmdArgs{
ContainerID: "dummy",
Netns: containerWithTbfNS.Path(),
IfName: containerWithTbfIFName,
StdinData: []byte(conf),
}
_, out, err := testutils.CmdAddWithResult(containerWithTbfNS.Path(), "", []byte(conf), func() error { return cmdAdd(args) })
Expect(err).NotTo(HaveOccurred(), string(out))
return nil
})).To(Succeed())
By("starting a tcp server on both containers")
portServerWithTbf, echoServerWithTbf, err = startEchoServerInNamespace(containerWithTbfNS)
Expect(err).NotTo(HaveOccurred())
portServerWithoutTbf, echoServerWithoutTbf, err = startEchoServerInNamespace(containerWithoutTbfNS)
Expect(err).NotTo(HaveOccurred())
})
AfterEach(func() {
containerWithTbfNS.Close()
containerWithoutTbfNS.Close()
if echoServerWithoutTbf != nil {
echoServerWithoutTbf.Kill()
}
if echoServerWithTbf != nil {
echoServerWithTbf.Kill()
}
})
Measure("limits ingress traffic on veth device", func(b Benchmarker) {
var runtimeWithLimit time.Duration
var runtimeWithoutLimit time.Duration
By("gather timing statistics about both containers")
By("sending tcp traffic to the container that has traffic shaped", func() {
runtimeWithLimit = b.Time("with tbf", func() {
result, err := current.GetResult(containerWithTbfRes)
Expect(err).NotTo(HaveOccurred())
makeTcpClientInNS(hostNs.Path(), result.IPs[0].Address.IP.String(), portServerWithTbf, packetInBytes)
})
})
By("sending tcp traffic to the container that does not have traffic shaped", func() {
runtimeWithoutLimit = b.Time("without tbf", func() {
result, err := current.GetResult(containerWithoutTbfRes)
Expect(err).NotTo(HaveOccurred())
makeTcpClientInNS(hostNs.Path(), result.IPs[0].Address.IP.String(), portServerWithoutTbf, packetInBytes)
})
})
Expect(runtimeWithLimit).To(BeNumerically(">", runtimeWithoutLimit+1000*time.Millisecond))
}, 1)
})
})

View File

@ -0,0 +1,199 @@
package bandwidth
// Copyright 2018 CNI authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"bytes"
"fmt"
"github.com/containernetworking/plugins/pkg/ns"
"github.com/onsi/gomega/gbytes"
"github.com/onsi/gomega/gexec"
"github.com/vishvananda/netlink"
"io"
"net"
"os/exec"
"path/filepath"
"strconv"
"strings"
"testing"
)
func TestTBF(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "bandwidth suite")
}
var echoServerBinaryPath string
var _ = SynchronizedBeforeSuite(func() []byte {
binaryPath, err := gexec.Build("github.com/containernetworking/plugins/pkg/testutils/echosvr")
Expect(err).NotTo(HaveOccurred())
return []byte(binaryPath)
}, func(data []byte) {
echoServerBinaryPath = string(data)
})
var _ = SynchronizedAfterSuite(func() {}, func() {
gexec.CleanupBuildArtifacts()
})
func startInNetNS(binPath string, netNS ns.NetNS) (*gexec.Session, error) {
baseName := filepath.Base(netNS.Path())
// we are relying on the netNS path living in /var/run/netns
// where `ip netns exec` can find it
cmd := exec.Command("ip", "netns", "exec", baseName, binPath)
session, err := gexec.Start(cmd, GinkgoWriter, GinkgoWriter)
return session, err
}
func startEchoServerInNamespace(netNS ns.NetNS) (int, *gexec.Session, error) {
session, err := startInNetNS(echoServerBinaryPath, netNS)
Expect(err).NotTo(HaveOccurred())
// wait for it to print it's address on stdout
Eventually(session.Out).Should(gbytes.Say("\n"))
_, portString, err := net.SplitHostPort(strings.TrimSpace(string(session.Out.Contents())))
Expect(err).NotTo(HaveOccurred())
port, err := strconv.Atoi(portString)
Expect(err).NotTo(HaveOccurred())
go func() {
// print out echoserver output to ginkgo to capture any errors that might be occurring.
io.Copy(GinkgoWriter, io.MultiReader(session.Out, session.Err))
}()
return port, session, nil
}
func makeTcpClientInNS(netns string, address string, port int, numBytes int) {
message := bytes.Repeat([]byte{'a'}, numBytes)
bin, err := exec.LookPath("nc")
Expect(err).NotTo(HaveOccurred())
var cmd *exec.Cmd
if netns != "" {
netns = filepath.Base(netns)
cmd = exec.Command("ip", "netns", "exec", netns, bin, "-v", address, strconv.Itoa(port))
} else {
cmd = exec.Command("nc", address, strconv.Itoa(port))
}
cmd.Stdin = bytes.NewBuffer([]byte(message))
cmd.Stderr = GinkgoWriter
out, err := cmd.Output()
Expect(err).NotTo(HaveOccurred())
Expect(string(out)).To(Equal(string(message)))
}
func createVeth(hostNamespace string, hostVethIfName string, containerNamespace string, containerVethIfName string, hostIP []byte, containerIP []byte, hostIfaceMTU int) {
vethDeviceRequest := &netlink.Veth{
LinkAttrs: netlink.LinkAttrs{
Name: hostVethIfName,
Flags: net.FlagUp,
MTU: hostIfaceMTU,
},
PeerName: containerVethIfName,
}
hostNs, err := ns.GetNS(hostNamespace)
Expect(err).NotTo(HaveOccurred())
err = hostNs.Do(func(_ ns.NetNS) error {
if err := netlink.LinkAdd(vethDeviceRequest); err != nil {
return fmt.Errorf("creating veth pair: %s", err)
}
containerVeth, err := netlink.LinkByName(containerVethIfName)
if err != nil {
return fmt.Errorf("failed to find newly-created veth device %q: %v", containerVethIfName, err)
}
containerNs, err := ns.GetNS(containerNamespace)
if err != nil {
return err
}
err = netlink.LinkSetNsFd(containerVeth, int(containerNs.Fd()))
if err != nil {
return fmt.Errorf("failed to move veth to container namespace: %s", err)
}
localAddr := &net.IPNet{
IP: hostIP,
Mask: []byte{255, 255, 255, 255},
}
peerAddr := &net.IPNet{
IP: containerIP,
Mask: []byte{255, 255, 255, 255},
}
addr, err := netlink.ParseAddr(localAddr.String())
if err != nil {
return fmt.Errorf("parsing address %s: %s", localAddr, err)
}
addr.Peer = peerAddr
addr.Scope = int(netlink.SCOPE_LINK)
hostVeth, err := netlink.LinkByName(hostVethIfName)
if err != nil {
return fmt.Errorf("failed to find newly-created veth device %q: %v", containerVethIfName, err)
}
err = netlink.AddrAdd(hostVeth, addr)
if err != nil {
return fmt.Errorf("adding IP address %s: %s", localAddr, err)
}
return nil
})
Expect(err).NotTo(HaveOccurred())
containerNs, err := ns.GetNS(containerNamespace)
err = containerNs.Do(func(_ ns.NetNS) error {
peerAddr := &net.IPNet{
IP: hostIP,
Mask: []byte{255, 255, 255, 255},
}
localAddr := &net.IPNet{
IP: containerIP,
Mask: []byte{255, 255, 255, 255},
}
addr, err := netlink.ParseAddr(localAddr.String())
if err != nil {
return fmt.Errorf("parsing address %s: %s", localAddr, err)
}
addr.Peer = peerAddr
addr.Scope = int(netlink.SCOPE_LINK)
containerVeth, err := netlink.LinkByName(containerVethIfName)
if err != nil {
return fmt.Errorf("failed to find newly-created veth device %q: %v", containerVethIfName, err)
}
err = netlink.AddrAdd(containerVeth, addr)
if err != nil {
return fmt.Errorf("adding IP address %s: %s", localAddr, err)
}
return nil
})
Expect(err).NotTo(HaveOccurred())
}

View File

@ -0,0 +1,165 @@
package bandwidth
// Copyright 2018 CNI authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import (
"fmt"
"github.com/containernetworking/plugins/pkg/ip"
"github.com/vishvananda/netlink"
"net"
"syscall"
)
const latencyInMillis = 25
func CreateIfb(ifbDeviceName string, mtu int) error {
err := netlink.LinkAdd(&netlink.Ifb{
LinkAttrs: netlink.LinkAttrs{
Name: ifbDeviceName,
Flags: net.FlagUp,
MTU: mtu,
},
})
if err != nil {
return fmt.Errorf("adding link: %s", err)
}
return nil
}
func TeardownIfb(deviceName string) error {
_, err := ip.DelLinkByNameAddr(deviceName)
if err != nil && err == ip.ErrLinkNotFound {
return nil
}
return err
}
func CreateIngressQdisc(rateInBits, burstInBits int, hostDeviceName string) error {
hostDevice, err := netlink.LinkByName(hostDeviceName)
if err != nil {
return fmt.Errorf("get host device: %s", err)
}
return createTBF(rateInBits, burstInBits, hostDevice.Attrs().Index)
}
func CreateEgressQdisc(rateInBits, burstInBits int, hostDeviceName string, ifbDeviceName string) error {
ifbDevice, err := netlink.LinkByName(ifbDeviceName)
if err != nil {
return fmt.Errorf("get ifb device: %s", err)
}
hostDevice, err := netlink.LinkByName(hostDeviceName)
if err != nil {
return fmt.Errorf("get host device: %s", err)
}
// add qdisc ingress on host device
ingress := &netlink.Ingress{
QdiscAttrs: netlink.QdiscAttrs{
LinkIndex: hostDevice.Attrs().Index,
Handle: netlink.MakeHandle(0xffff, 0), // ffff:
Parent: netlink.HANDLE_INGRESS,
},
}
err = netlink.QdiscAdd(ingress)
if err != nil {
return fmt.Errorf("create ingress qdisc: %s", err)
}
// add filter on host device to mirror traffic to ifb device
filter := &netlink.U32{
FilterAttrs: netlink.FilterAttrs{
LinkIndex: hostDevice.Attrs().Index,
Parent: ingress.QdiscAttrs.Handle,
Priority: 1,
Protocol: syscall.ETH_P_ALL,
},
ClassId: netlink.MakeHandle(1, 1),
RedirIndex: ifbDevice.Attrs().Index,
Actions: []netlink.Action{
&netlink.MirredAction{
ActionAttrs: netlink.ActionAttrs{},
MirredAction: netlink.TCA_EGRESS_REDIR,
Ifindex: ifbDevice.Attrs().Index,
},
},
}
err = netlink.FilterAdd(filter)
if err != nil {
return fmt.Errorf("add filter: %s", err)
}
// throttle traffic on ifb device
err = createTBF(rateInBits, burstInBits, ifbDevice.Attrs().Index)
if err != nil {
return fmt.Errorf("create ifb qdisc: %s", err)
}
return nil
}
func createTBF(rateInBits, burstInBits, linkIndex int) error {
// Equivalent to
// tc qdisc add dev link root tbf
// rate netConf.BandwidthLimits.Rate
// burst netConf.BandwidthLimits.Burst
if rateInBits <= 0 {
return fmt.Errorf("invalid rate: %d", rateInBits)
}
if burstInBits <= 0 {
return fmt.Errorf("invalid burst: %d", burstInBits)
}
rateInBytes := rateInBits / 8
bufferInBytes := buffer(uint64(rateInBytes), uint32(burstInBits))
latency := latencyInUsec(latencyInMillis)
limitInBytes := limit(uint64(rateInBytes), latency, uint32(bufferInBytes))
qdisc := &netlink.Tbf{
QdiscAttrs: netlink.QdiscAttrs{
LinkIndex: linkIndex,
Handle: netlink.MakeHandle(1, 0),
Parent: netlink.HANDLE_ROOT,
},
Limit: uint32(limitInBytes),
Rate: uint64(rateInBytes),
Buffer: uint32(bufferInBytes),
}
err := netlink.QdiscAdd(qdisc)
if err != nil {
return fmt.Errorf("create qdisc: %s", err)
}
return nil
}
func tick2Time(tick uint32) uint32 {
return uint32(float64(tick) / float64(netlink.TickInUsec()))
}
func time2Tick(time uint32) uint32 {
return uint32(float64(time) * float64(netlink.TickInUsec()))
}
func buffer(rate uint64, burst uint32) uint32 {
return time2Tick(uint32(float64(burst) * float64(netlink.TIME_UNITS_PER_SEC) / float64(rate)))
}
func limit(rate uint64, latency float64, buffer uint32) uint32 {
return uint32(float64(rate) / float64(netlink.TIME_UNITS_PER_SEC) * (latency + float64(tick2Time(buffer))))
}
func latencyInUsec(latencyInMillis float64) float64 {
return float64(netlink.TIME_UNITS_PER_SEC) * (latencyInMillis / 1000.0)
}

View File

@ -0,0 +1,218 @@
// Copyright 2018 CNI authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package bandwidth
import (
"crypto/sha1"
"encoding/json"
"errors"
"fmt"
"github.com/containernetworking/cni/pkg/skel"
"github.com/containernetworking/cni/pkg/types"
"github.com/containernetworking/cni/pkg/types/current"
"github.com/containernetworking/cni/pkg/version"
"github.com/vishvananda/netlink"
)
type PluginConf struct {
types.NetConf
RuntimeConfig *struct{} `json:"runtimeConfig"`
// This is the previous result, when called in the context of a chained
// plugin. Because this plugin supports multiple versions, we'll have to
// parse this in two passes. If your plugin is not chained, this can be
// removed (though you may wish to error if a non-chainable plugin is
// chained.
// If you need to modify the result before returning it, you will need
// to actually convert it to a concrete versioned struct.
RawPrevResult *map[string]interface{} `json:"prevResult"`
PrevResult *current.Result `json:"-"`
// Add plugin-specifc flags here
IngressRate int `json:"ingressRate"` //Bandwidth rate in Kbps for traffic through container. 0 for no limit. If ingressRate is set, ingressBurst must also be set
IngressBurst int `json:"ingressBurst"` //Bandwidth burst in Kb for traffic through container. 0 for no limit. If ingressBurst is set, ingressRate must also be set
EgressRate int `json:"egressRate"` //Bandwidth rate in Kbps for traffic through container. 0 for no limit. If egressRate is set, egressBurst must also be set
EgressBurst int `json:"egressBurst"` //Bandwidth burst in Kb for traffic through container. 0 for no limit. If egressBurst is set, egressRate must also be set
}
// parseConfig parses the supplied configuration (and prevResult) from stdin.
func parseConfig(stdin []byte) (*PluginConf, error) {
conf := PluginConf{}
if err := json.Unmarshal(stdin, &conf); err != nil {
return nil, fmt.Errorf("failed to parse network configuration: %v", err)
}
// Parse previous result. Remove this if your plugin is not chained.
if conf.RawPrevResult != nil {
resultBytes, err := json.Marshal(conf.RawPrevResult)
if err != nil {
return nil, fmt.Errorf("could not serialize prevResult: %v", err)
}
res, err := version.NewResult(conf.CNIVersion, resultBytes)
if err != nil {
return nil, fmt.Errorf("could not parse prevResult: %v", err)
}
conf.RawPrevResult = nil
conf.PrevResult, err = current.NewResultFromResult(res)
if err != nil {
return nil, fmt.Errorf("could not convert result to current version: %v", err)
}
}
// End previous result parsing
err := validateRateAndBurst(conf.IngressRate, conf.IngressBurst)
if err != nil {
return nil, err
}
err = validateRateAndBurst(conf.EgressRate, conf.EgressBurst)
if err != nil {
return nil, err
}
return &conf, nil
}
func validateRateAndBurst(rate int, burst int) error {
switch {
case burst < 0 || rate < 0:
return fmt.Errorf("rate and burst must be a positive integer")
case burst == 0 && rate != 0:
return fmt.Errorf("if rate is set, burst must also be set")
case rate == 0 && burst != 0:
return fmt.Errorf("if burst is set, rate must also be set")
}
return nil
}
func getIfbDeviceName(networkName string, containerId string) (string, error) {
hash := sha1.New()
_, err := hash.Write([]byte(networkName + containerId))
if err != nil {
return "", err
}
return fmt.Sprintf("%x", hash.Sum(nil))[:4], nil
}
func getMTU(deviceName string) (int, error) {
link, err := netlink.LinkByName(deviceName)
if err != nil {
return -1, err
}
return link.Attrs().MTU, nil
}
func getHostInterface(interfaces []*current.Interface) (*current.Interface, error) {
for _, prevIface := range interfaces {
if prevIface.Sandbox != "" {
continue
}
return prevIface, nil
}
return nil, errors.New("no host interface found")
}
// cmdAdd is called for ADD requests
func cmdAdd(args *skel.CmdArgs) error {
conf, err := parseConfig(args.StdinData)
if err != nil {
return err
}
//no traffic shaping was requested, so just no-op and quit
if conf.IngressRate == 0 && conf.IngressBurst == 0 && conf.EgressRate == 0 && conf.EgressBurst == 0 {
return types.PrintResult(conf.PrevResult, conf.CNIVersion)
}
if conf.PrevResult == nil {
return fmt.Errorf("must be called as chained plugin")
}
hostInterface, err := getHostInterface(conf.PrevResult.Interfaces)
if err != nil {
return err
}
if conf.IngressRate > 0 && conf.IngressBurst > 0 {
err = CreateIngressQdisc(conf.IngressRate, conf.IngressBurst, hostInterface.Name)
if err != nil {
return err
}
}
if conf.EgressRate > 0 && conf.EgressBurst > 0 {
mtu, err := getMTU(hostInterface.Name)
if err != nil {
return err
}
ifbDeviceName, err := getIfbDeviceName(conf.Name, args.ContainerID)
if err != nil {
return err
}
err = CreateIfb(ifbDeviceName, mtu)
if err != nil {
return err
}
ifbDevice, err := netlink.LinkByName(ifbDeviceName)
if err != nil {
return err
}
conf.PrevResult.Interfaces = append(conf.PrevResult.Interfaces, &current.Interface{
Name: ifbDeviceName,
Mac: ifbDevice.Attrs().HardwareAddr.String(),
})
err = CreateEgressQdisc(conf.EgressRate, conf.EgressBurst, hostInterface.Name, ifbDeviceName)
if err != nil {
return err
}
}
// Pass through the result for the next plugin
return types.PrintResult(conf.PrevResult, conf.CNIVersion)
}
// cmdDel is called for DELETE requests
func cmdDel(args *skel.CmdArgs) error {
conf, err := parseConfig(args.StdinData)
if err != nil {
return err
}
ifbDeviceName, err := getIfbDeviceName(conf.Name, args.ContainerID)
if err != nil {
return err
}
if err := TeardownIfb(ifbDeviceName); err != nil {
return err
}
return nil
}
func main() {
skel.PluginMain(cmdAdd, cmdDel, version.PluginSupports("0.3.0", "0.3.1", version.Current()))
}