bandwidth: possibility to exclude some subnets from traffic shaping

what changed:

we had to refactor the bandwidth plugin and switch from a classless qdisc (tbf)
to a classful qdisc (htb).

subnets are to be provided in config or runtimeconfig just like other parameters

unit and integration tests were also adapted in consequence

unrelated changes:

test fixes: the most important tests were just silently skipped due to ginkgo Measure deprecation
(the ones actually checking the effectiveness of the traffic control)

Signed-off-by: Raphael <oOraph@users.noreply.github.com>
This commit is contained in:
Raphael
2023-06-28 19:07:23 +02:00
committed by Tomofumi Hayashi
parent 597408952e
commit 52da39d3aa
7 changed files with 1653 additions and 552 deletions

View File

@ -15,6 +15,8 @@
package main
import (
"bytes"
"encoding/binary"
"fmt"
"net"
"syscall"
@ -24,14 +26,23 @@ import (
"github.com/containernetworking/plugins/pkg/ip"
)
const latencyInMillis = 25
const (
latencyInMillis = 25
UncappedRate uint64 = 100_000_000_000
DefaultClassMinorID = 48
)
func CreateIfb(ifbDeviceName string, mtu int, qlen int) error {
if qlen < 1000 {
qlen = 1000
}
func CreateIfb(ifbDeviceName string, mtu int) error {
err := netlink.LinkAdd(&netlink.Ifb{
LinkAttrs: netlink.LinkAttrs{
Name: ifbDeviceName,
Flags: net.FlagUp,
MTU: mtu,
Name: ifbDeviceName,
Flags: net.FlagUp,
MTU: mtu,
TxQLen: qlen,
},
})
if err != nil {
@ -49,15 +60,15 @@ func TeardownIfb(deviceName string) error {
return err
}
func CreateIngressQdisc(rateInBits, burstInBits uint64, hostDeviceName string) error {
func CreateIngressQdisc(rateInBits, burstInBits uint64, excludeSubnets []string, hostDeviceName string) error {
hostDevice, err := netlink.LinkByName(hostDeviceName)
if err != nil {
return fmt.Errorf("get host device: %s", err)
}
return createTBF(rateInBits, burstInBits, hostDevice.Attrs().Index)
return createHTB(rateInBits, burstInBits, hostDevice.Attrs().Index, excludeSubnets)
}
func CreateEgressQdisc(rateInBits, burstInBits uint64, hostDeviceName string, ifbDeviceName string) error {
func CreateEgressQdisc(rateInBits, burstInBits uint64, excludeSubnets []string, hostDeviceName string, ifbDeviceName string) error {
ifbDevice, err := netlink.LinkByName(ifbDeviceName)
if err != nil {
return fmt.Errorf("get ifb device: %s", err)
@ -105,43 +116,201 @@ func CreateEgressQdisc(rateInBits, burstInBits uint64, hostDeviceName string, if
}
// throttle traffic on ifb device
err = createTBF(rateInBits, burstInBits, ifbDevice.Attrs().Index)
err = createHTB(rateInBits, burstInBits, ifbDevice.Attrs().Index, excludeSubnets)
if err != nil {
return fmt.Errorf("create ifb qdisc: %s", err)
// egress from the container/netns pov = ingress from the main netns/host pov
return fmt.Errorf("create htb container egress qos rules: %s", err)
}
return nil
}
func createTBF(rateInBits, burstInBits uint64, linkIndex int) error {
// Equivalent to
// tc qdisc add dev link root tbf
// rate netConf.BandwidthLimits.Rate
// burst netConf.BandwidthLimits.Burst
if rateInBits <= 0 {
return fmt.Errorf("invalid rate: %d", rateInBits)
}
if burstInBits <= 0 {
return fmt.Errorf("invalid burst: %d", burstInBits)
}
rateInBytes := rateInBits / 8
burstInBytes := burstInBits / 8
bufferInBytes := buffer(rateInBytes, uint32(burstInBytes))
latency := latencyInUsec(latencyInMillis)
limitInBytes := limit(rateInBytes, latency, uint32(burstInBytes))
func createHTB(rateInBits, burstInBits uint64, linkIndex int, excludeSubnets []string) error {
// Netlink struct fields are not clear, let's use shell
qdisc := &netlink.Tbf{
// Step 1 qdisc
// cmd := exec.Command("/usr/sbin/tc", "qdisc", "add", "dev", interfaceName, "root", "handle", "1:", "htb", "default", "30")
qdisc := &netlink.Htb{
QdiscAttrs: netlink.QdiscAttrs{
LinkIndex: linkIndex,
Handle: netlink.MakeHandle(1, 0),
Parent: netlink.HANDLE_ROOT,
},
Limit: limitInBytes,
Rate: rateInBytes,
Buffer: bufferInBytes,
Defcls: DefaultClassMinorID,
// No idea what these are so let's keep the default values from source code...
Version: 3,
Rate2Quantum: 10,
}
err := netlink.QdiscAdd(qdisc)
if err != nil {
return fmt.Errorf("create qdisc: %s", err)
return fmt.Errorf("error while creating qdisc: %s", err)
}
// Step 2 classes
rateInBytes := rateInBits / 8
burstInBytes := burstInBits / 8
bufferInBytes := buffer(rateInBytes, uint32(burstInBytes))
// The capped class for all but excluded subnets
// cmd = exec.Command("/usr/sbin/tc", "class", "add", "dev", interfaceName, "parent", "1:", "classid", "1:30", "htb", "rate",
// fmt.Sprintf("%d", rateInBits), "burst", fmt.Sprintf("%d", burstInBits))
defClass := &netlink.HtbClass{
ClassAttrs: netlink.ClassAttrs{
LinkIndex: linkIndex,
Handle: netlink.MakeHandle(1, DefaultClassMinorID),
Parent: netlink.MakeHandle(1, 0),
},
Rate: rateInBytes,
Buffer: bufferInBytes,
// Let's set up the "burst" rate to twice the specified rate
Ceil: 2 * rateInBytes,
Cbuffer: bufferInBytes,
}
err = netlink.ClassAdd(defClass)
if err != nil {
return fmt.Errorf("error while creating htb default class: %s", err)
}
// The uncapped class for the excluded subnets
// cmd = exec.Command("/usr/sbin/tc", "class", "add", "dev", interfaceName, "parent", "1:", "classid", "1:1", "htb",
// "rate", "100000000000")
bigRate := UncappedRate
uncappedClass := &netlink.HtbClass{
ClassAttrs: netlink.ClassAttrs{
LinkIndex: linkIndex,
Handle: netlink.MakeHandle(1, 1),
Parent: qdisc.Handle,
},
Rate: bigRate,
Ceil: bigRate,
// No need for any burst, the minimum buffer size in q_htb.c should be enough to handle the rate which
// is already more than enough
}
err = netlink.ClassAdd(uncappedClass)
if err != nil {
return fmt.Errorf("error while creating htb uncapped class: %s", err)
}
// Now add filters to redirect excluded subnets to the class 1 instead of the default one (30)
for _, subnet := range excludeSubnets {
// cmd = exec.Command("/usr/sbin/tc", "filter", "add", "dev", interfaceName, "parent", "1:", "protocol", protocol,
// "prio", "16", "u32", "match", "ip", "dst", subnet, "flowid", "1:1")
_, nw, err := net.ParseCIDR(subnet)
if err != nil {
return fmt.Errorf("bad subnet %s: %s", subnet, err)
}
var maskBytes []byte = nw.Mask
var subnetBytes []byte = nw.IP
if len(maskBytes) != len(subnetBytes) {
return fmt.Errorf("error using net lib for subnet %s len(maskBytes) != len(subnetBytes) "+
"(%d != %d) should not happen", subnet, len(maskBytes), len(subnetBytes))
}
isIpv4 := nw.IP.To4() != nil
protocol := syscall.ETH_P_IPV6
var prio uint16 = 15
var offset int32 = 24
keepBytes := 16
if isIpv4 {
protocol = syscall.ETH_P_IP
offset = 16
keepBytes = 4
// prio/pref needs to be changed if we change the protocol, looks like we cannot mix protocols with the same pref
prio = 16
}
// protocol := syscall.ETH_P_ALL
if len(maskBytes) < keepBytes {
return fmt.Errorf("error with net lib, unexpected count of bytes for ipv4 mask (%d < %d)",
len(maskBytes), keepBytes)
}
if len(subnetBytes) < keepBytes {
return fmt.Errorf("error with net lib, unexpected count of bytes for ipv4 subnet (%d < %d)",
len(subnetBytes), keepBytes)
}
maskBytes = maskBytes[len(maskBytes)-keepBytes:]
subnetBytes = subnetBytes[len(subnetBytes)-keepBytes:]
// For ipv4 we should have at most 1 key, for ipv6 at most 4
keys := make([]netlink.TcU32Key, 0, 4)
for i := 0; i < len(maskBytes); i += 4 {
var mask, subnetI uint32
buf := bytes.NewReader(maskBytes[i : i+4])
err = binary.Read(buf, binary.BigEndian, &mask)
if err != nil {
return fmt.Errorf("error, htb filter, unable to build mask match filter, iter %d for subnet %s",
i, subnet)
}
if mask != 0 {
// If mask == 0, any value on this section will be a match and we do not need a filter for this
buf = bytes.NewReader(subnetBytes[i : i+4])
err = binary.Read(buf, binary.BigEndian, &subnetI)
if err != nil {
return fmt.Errorf("error, htb filter, unable to build subnet match filter, iter %d for subnet %s",
i, subnet)
}
keys = append(keys, netlink.TcU32Key{
Mask: mask,
Val: subnetI,
Off: offset,
OffMask: 0,
})
}
offset += 4
}
if len(keys) != cap(keys) {
shrinkedKeys := make([]netlink.TcU32Key, len(keys))
copied := copy(shrinkedKeys, keys)
if copied != len(keys) {
return fmt.Errorf("copy tc u32 keys error, for subnet %s copied %d != keys %d", subnet, copied, len(keys))
}
keys = shrinkedKeys
}
if isIpv4 && len(keys) > 1 {
return fmt.Errorf("error, htb ipv4 filter, unexpected rule length (%d > 1), for subnet %s",
len(keys), subnet)
} else if len(keys) > 4 {
return fmt.Errorf("error, htb ipv6 filter, unexpected rule length (%d > 4), for subnet %s",
len(keys), subnet)
}
// If len(keys) == 0, it means that we want to wildcard all traffic on the non default/uncapped class
var selector *netlink.TcU32Sel
if len(keys) > 0 {
selector = &netlink.TcU32Sel{
Nkeys: uint8(len(keys)),
Flags: netlink.TC_U32_TERMINAL,
Keys: keys,
}
}
tcFilter := netlink.U32{
FilterAttrs: netlink.FilterAttrs{
LinkIndex: linkIndex,
Parent: qdisc.Handle,
Priority: prio,
Protocol: uint16(protocol),
},
ClassId: uncappedClass.Handle,
Sel: selector,
}
err = netlink.FilterAdd(&tcFilter)
if err != nil {
return fmt.Errorf("error, unable to create htb filter, details %s", err)
}
}
return nil
}
@ -153,11 +322,3 @@ func time2Tick(time uint32) uint32 {
func buffer(rate uint64, burst uint32) uint32 {
return time2Tick(uint32(float64(burst) * float64(netlink.TIME_UNITS_PER_SEC) / float64(rate)))
}
func limit(rate uint64, latency float64, buffer uint32) uint32 {
return uint32(float64(rate)*latency/float64(netlink.TIME_UNITS_PER_SEC)) + buffer
}
func latencyInUsec(latencyInMillis float64) float64 {
return float64(netlink.TIME_UNITS_PER_SEC) * (latencyInMillis / 1000.0)
}