diff --git a/pkg/testutils/echo/client/client.go b/pkg/testutils/echo/client/client.go index 97a8adec..5e4096cb 100644 --- a/pkg/testutils/echo/client/client.go +++ b/pkg/testutils/echo/client/client.go @@ -10,19 +10,32 @@ import ( func main() { target := flag.String("target", "", "the server address") payload := flag.String("message", "", "the message to send to the server") + protocol := flag.String("protocol", "tcp", "the protocol to use with the server [udp,tcp], default tcp") flag.Parse() if *target == "" || *payload == "" { flag.Usage() panic("invalid arguments") } - conn, err := net.Dial("tcp", *target) + + switch *protocol { + case "tcp": + connectTCP(*target, *payload) + case "udp": + connectUDP(*target, *payload) + default: + panic("invalid protocol") + } +} + +func connectTCP(target, payload string) { + conn, err := net.Dial("tcp", target) if err != nil { - panic(fmt.Sprintf("Failed to open connection to [%s] %v", *target, err)) + panic(fmt.Sprintf("Failed to open connection to [%s] %v", target, err)) } defer conn.Close() - _, err = conn.Write([]byte(*payload)) + _, err = conn.Write([]byte(payload)) if err != nil { panic("Failed to send payload") } @@ -30,8 +43,7 @@ func main() { if err != nil { panic("Failed to send payload") } - - buf := make([]byte, 4) + buf := make([]byte, 1024) for { n, err := conn.Read(buf) fmt.Print(string(buf[:n])) @@ -43,3 +55,36 @@ func main() { } } } + +// UDP uses a constant source port to trigger conntrack problems +func connectUDP(target, payload string) { + LocalAddr, err := net.ResolveUDPAddr("udp", ":54321") + if err != nil { + panic(fmt.Sprintf("Failed to resolve UDP local address on port 54321 %v", err)) + } + RemoteAddr, err := net.ResolveUDPAddr("udp", target) + if err != nil { + panic(fmt.Sprintf("Failed to resolve UDP remote address [%s] %v", target, err)) + } + conn, err := net.DialUDP("udp", LocalAddr, RemoteAddr) + if err != nil { + panic(fmt.Sprintf("Failed to open connection to [%s] %v", target, err)) + } + defer conn.Close() + + _, err = conn.Write([]byte(payload)) + if err != nil { + panic("Failed to send payload") + } + _, err = conn.Write([]byte("\n")) + if err != nil { + panic("Failed to send payload") + } + + buf := make([]byte, 1024) + n, err := conn.Read(buf) + if err != nil { + panic("Failed to read from socket") + } + fmt.Print(string(buf[:n])) +} diff --git a/pkg/testutils/echo/server/main.go b/pkg/testutils/echo/server/main.go index fe6ac46f..2db49847 100644 --- a/pkg/testutils/echo/server/main.go +++ b/pkg/testutils/echo/server/main.go @@ -10,6 +10,7 @@ import ( "bufio" "fmt" "io" + "log" "net" "os" "strings" @@ -17,21 +18,50 @@ import ( ) func main() { + // Start TCP server listener, err := net.Listen("tcp", ":") if err != nil { panic(err) } + defer listener.Close() + // use the same port for UDP _, port, err := net.SplitHostPort(listener.Addr().String()) if err != nil { panic(err) } fmt.Printf("127.0.0.1:%s\n", port) - for { - conn, err := listener.Accept() - if err != nil { - panic(err) + go func() { + for { + conn, err := listener.Accept() + if err != nil { + panic(err) + } + go handleConnection(conn) + } + }() + + // Start UDP server + addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf(":%s", port)) + if err != nil { + log.Fatalf("Error from net.ResolveUDPAddr(): %s", err) + } + sock, err := net.ListenUDP("udp", addr) + if err != nil { + log.Fatalf("Error from ListenUDP(): %s", err) + } + defer sock.Close() + + buffer := make([]byte, 1024) + for { + n, addr, err := sock.ReadFrom(buffer) + if err != nil { + log.Fatalf("Error from ReadFrom(): %s", err) + } + sock.SetWriteDeadline(time.Now().Add(1 * time.Minute)) + n, err = sock.WriteTo(buffer[0:n], addr) + if err != nil { + return } - go handleConnection(conn) } } @@ -53,5 +83,4 @@ func handleConnection(conn net.Conn) { fmt.Fprint(os.Stderr, err.Error()) return } - } diff --git a/pkg/utils/conntrack.go b/pkg/utils/conntrack.go new file mode 100644 index 00000000..0b11f6c1 --- /dev/null +++ b/pkg/utils/conntrack.go @@ -0,0 +1,73 @@ +// Copyright 2020 CNI authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package utils + +import ( + "fmt" + "net" + + "github.com/vishvananda/netlink" + "golang.org/x/sys/unix" +) + +// Assigned Internet Protocol Numbers +// https://www.iana.org/assignments/protocol-numbers/protocol-numbers.xhtml +const ( + PROTOCOL_TCP = 6 + PROTOCOL_UDP = 17 + PROTOCOL_SCTP = 132 +) + +// getNetlinkFamily returns the Netlink IP family constant +func getNetlinkFamily(isIPv6 bool) netlink.InetFamily { + if isIPv6 { + return unix.AF_INET6 + } + return unix.AF_INET +} + +// DeleteConntrackEntriesForDstIP delete the conntrack entries for the connections +// specified by the given destination IP and protocol +func DeleteConntrackEntriesForDstIP(dstIP string, protocol uint8) error { + ip := net.ParseIP(dstIP) + if ip == nil { + return fmt.Errorf("error deleting connection tracking state, bad IP %s", ip) + } + family := getNetlinkFamily(ip.To4() == nil) + + filter := &netlink.ConntrackFilter{} + filter.AddIP(netlink.ConntrackOrigDstIP, ip) + filter.AddProtocol(protocol) + + _, err := netlink.ConntrackDeleteFilter(netlink.ConntrackTable, family, filter) + if err != nil { + return fmt.Errorf("error deleting connection tracking state for protocol: %d IP: %s, error: %v", protocol, ip, err) + } + return nil +} + +// DeleteConntrackEntriesForDstPort delete the conntrack entries for the connections specified +// by the given destination port, protocol and IP family +func DeleteConntrackEntriesForDstPort(port uint16, protocol uint8, family netlink.InetFamily) error { + filter := &netlink.ConntrackFilter{} + filter.AddPort(netlink.ConntrackOrigDstPort, port) + filter.AddProtocol(protocol) + + _, err := netlink.ConntrackDeleteFilter(netlink.ConntrackTable, family, filter) + if err != nil { + return fmt.Errorf("error deleting connection tracking state for protocol: %d Port: %d, error: %v", protocol, port, err) + } + return nil +} diff --git a/plugins/meta/portmap/main.go b/plugins/meta/portmap/main.go index 9d155916..bfd11bba 100644 --- a/plugins/meta/portmap/main.go +++ b/plugins/meta/portmap/main.go @@ -28,12 +28,14 @@ package main import ( "encoding/json" "fmt" + "log" "net" "github.com/containernetworking/cni/pkg/skel" "github.com/containernetworking/cni/pkg/types" "github.com/containernetworking/cni/pkg/types/current" "github.com/containernetworking/cni/pkg/version" + "golang.org/x/sys/unix" bv "github.com/containernetworking/plugins/pkg/utils/buildversion" ) @@ -89,12 +91,24 @@ func cmdAdd(args *skel.CmdArgs) error { if err := forwardPorts(netConf, netConf.ContIPv4); err != nil { return err } + // Delete conntrack entries for UDP to avoid conntrack blackholing traffic + // due to stale connections. We do that after the iptables rules are set, so + // the new traffic uses them. Failures are informative only. + if err := deletePortmapStaleConnections(netConf.RuntimeConfig.PortMaps, unix.AF_INET); err != nil { + log.Printf("failed to delete stale UDP conntrack entries for %s: %v", netConf.ContIPv4.IP, err) + } } if netConf.ContIPv6.IP != nil { if err := forwardPorts(netConf, netConf.ContIPv6); err != nil { return err } + // Delete conntrack entries for UDP to avoid conntrack blackholing traffic + // due to stale connections. We do that after the iptables rules are set, so + // the new traffic uses them. Failures are informative only. + if err := deletePortmapStaleConnections(netConf.RuntimeConfig.PortMaps, unix.AF_INET6); err != nil { + log.Printf("failed to delete stale UDP conntrack entries for %s: %v", netConf.ContIPv6.IP, err) + } } // Pass through the previous result diff --git a/plugins/meta/portmap/portmap.go b/plugins/meta/portmap/portmap.go index f4e28774..d35136f0 100644 --- a/plugins/meta/portmap/portmap.go +++ b/plugins/meta/portmap/portmap.go @@ -19,10 +19,12 @@ import ( "net" "sort" "strconv" + "strings" "github.com/containernetworking/plugins/pkg/utils" "github.com/containernetworking/plugins/pkg/utils/sysctl" "github.com/coreos/go-iptables/iptables" + "github.com/vishvananda/netlink" ) // This creates the chains to be added to iptables. The basic structure is @@ -42,10 +44,12 @@ import ( // The names of the top-level summary chains. // These should never be changed, or else upgrading will require manual // intervention. -const TopLevelDNATChainName = "CNI-HOSTPORT-DNAT" -const SetMarkChainName = "CNI-HOSTPORT-SETMARK" -const MarkMasqChainName = "CNI-HOSTPORT-MASQ" -const OldTopLevelSNATChainName = "CNI-HOSTPORT-SNAT" +const ( + TopLevelDNATChainName = "CNI-HOSTPORT-DNAT" + SetMarkChainName = "CNI-HOSTPORT-SETMARK" + MarkMasqChainName = "CNI-HOSTPORT-MASQ" + OldTopLevelSNATChainName = "CNI-HOSTPORT-SNAT" +) // forwardPorts establishes port forwarding to a given container IP. // containerNet.IP can be either v4 or v6. @@ -113,7 +117,6 @@ func forwardPorts(config *PortMapConf, containerNet net.IPNet) error { } func checkPorts(config *PortMapConf, containerNet net.IPNet) error { - dnatChain := genDnatChain(config.Name, config.ContainerID) fillDnatRules(&dnatChain, config, containerNet) @@ -189,7 +192,7 @@ func fillDnatRules(c *chain, config *PortMapConf, containerNet net.IPNet) { setMarkChainName = *config.ExternalSetMarkChain } - //Generate the dnat entry rules. We'll use multiport, but it ony accepts + // Generate the dnat entry rules. We'll use multiport, but it ony accepts // up to 15 rules, so partition the list if needed. // Do it in a stable order for testing protoPorts := groupByProto(entries) @@ -243,7 +246,8 @@ func fillDnatRules(c *chain, config *PortMapConf, containerNet net.IPNet) { ruleBase := []string{ "-p", entry.Protocol, - "--dport", strconv.Itoa(entry.HostPort)} + "--dport", strconv.Itoa(entry.HostPort), + } if addRuleBaseDst { ruleBase = append(ruleBase, "-d", entry.HostIP) @@ -406,3 +410,19 @@ func maybeGetIptables(isV6 bool) *iptables.IPTables { return ipt } + +// deletePortmapStaleConnections delete the UDP conntrack entries on the specified IP family +// from the ports mapped to the container +func deletePortmapStaleConnections(portMappings []PortMapEntry, family netlink.InetFamily) error { + for _, pm := range portMappings { + // skip if is not UDP + if strings.ToLower(pm.Protocol) != "udp" { + continue + } + err := utils.DeleteConntrackEntriesForDstPort(uint16(pm.HostPort), utils.PROTOCOL_UDP, family) + if err != nil { + return err + } + } + return nil +} diff --git a/plugins/meta/portmap/portmap_integ_test.go b/plugins/meta/portmap/portmap_integ_test.go index 63d67b4e..b4cca147 100644 --- a/plugins/meta/portmap/portmap_integ_test.go +++ b/plugins/meta/portmap/portmap_integ_test.go @@ -184,16 +184,16 @@ var _ = Describe("portmap integration tests", func() { Expect(cmd.Run()).To(Succeed()) // Sanity check: verify that the container is reachable directly - contOK := testEchoServer(contIP.String(), containerPort, "") + contOK := testEchoServer(contIP.String(), "tcp", containerPort, "") // Verify that a connection to the forwarded port works - dnatOK := testEchoServer(hostIP, hostPort, "") + dnatOK := testEchoServer(hostIP, "tcp", hostPort, "") // Verify that a connection to localhost works - snatOK := testEchoServer("127.0.0.1", hostPort, "") + snatOK := testEchoServer("127.0.0.1", "tcp", hostPort, "") // verify that hairpin works - hairpinOK := testEchoServer(hostIP, hostPort, targetNS.Path()) + hairpinOK := testEchoServer(hostIP, "tcp", hostPort, targetNS.Path()) // Cleanup session.Terminate() @@ -219,21 +219,216 @@ var _ = Describe("portmap integration tests", func() { } close(done) + }, TIMEOUT*9) + It("forwards a UDP port on ipv4 and keep working after creating a second container with the same HostPort", func(done Done) { + var err error + hostPort := rand.Intn(10000) + 1025 + runtimeConfig := libcni.RuntimeConf{ + ContainerID: fmt.Sprintf("unit-test-%d", hostPort), + NetNS: targetNS.Path(), + IfName: "eth0", + CapabilityArgs: map[string]interface{}{ + "portMappings": []map[string]interface{}{ + { + "hostPort": hostPort, + "containerPort": containerPort, + "protocol": "udp", + }, + }, + }, + } + + // Make delete idempotent, so we can clean up on failure + netDeleted := false + deleteNetwork := func() error { + if netDeleted { + return nil + } + netDeleted = true + return cniConf.DelNetworkList(context.TODO(), configList, &runtimeConfig) + } + + // Create the network + resI, err := cniConf.AddNetworkList(context.TODO(), configList, &runtimeConfig) + Expect(err).NotTo(HaveOccurred()) + defer deleteNetwork() + + // Undo Docker's forwarding policy + cmd := exec.Command("iptables", "-t", "filter", + "-P", "FORWARD", "ACCEPT") + cmd.Stderr = GinkgoWriter + err = cmd.Run() + Expect(err).NotTo(HaveOccurred()) + + result, err := current.GetResult(resI) + Expect(err).NotTo(HaveOccurred()) + var contIP net.IP + + for _, ip := range result.IPs { + intfIndex := *ip.Interface + if result.Interfaces[intfIndex].Sandbox == "" { + continue + } + contIP = ip.Address.IP + } + if contIP == nil { + Fail("could not determine container IP") + } + + hostIP := getLocalIP() + fmt.Fprintf(GinkgoWriter, "First container hostIP: %s:%d, contIP: %s:%d\n", + hostIP, hostPort, contIP, containerPort) + + // dump iptables-save output for debugging + cmd = exec.Command("iptables-save") + cmd.Stderr = GinkgoWriter + cmd.Stdout = GinkgoWriter + Expect(cmd.Run()).To(Succeed()) + + // dump ip routes output for debugging + cmd = exec.Command("ip", "route") + cmd.Stderr = GinkgoWriter + cmd.Stdout = GinkgoWriter + Expect(cmd.Run()).To(Succeed()) + + // dump ip addresses output for debugging + cmd = exec.Command("ip", "addr") + cmd.Stderr = GinkgoWriter + cmd.Stdout = GinkgoWriter + Expect(cmd.Run()).To(Succeed()) + + // Sanity check: verify that the container is reachable directly + fmt.Fprintln(GinkgoWriter, "Connect to container:", contIP.String(), containerPort) + contOK := testEchoServer(contIP.String(), "udp", containerPort, "") + + // Verify that a connection to the forwarded port works + fmt.Fprintln(GinkgoWriter, "Connect to host:", hostIP, hostPort) + dnatOK := testEchoServer(hostIP, "udp", hostPort, "") + + // Cleanup + session.Terminate() + err = deleteNetwork() + Expect(err).NotTo(HaveOccurred()) + + // Check that everything succeeded *after* we clean up the network + if !contOK { + Fail("connection direct to " + contIP.String() + " failed") + } + if !dnatOK { + Fail("Connection to " + hostIP + " was not forwarded") + } + // Create a second container + targetNS2, err := testutils.NewNS() + Expect(err).NotTo(HaveOccurred()) + fmt.Fprintln(GinkgoWriter, "namespace:", targetNS2.Path()) + + // Start an echo server and get the port + containerPort, session2, err := StartEchoServerInNamespace(targetNS2) + Expect(err).NotTo(HaveOccurred()) + + runtimeConfig2 := libcni.RuntimeConf{ + ContainerID: fmt.Sprintf("unit-test2-%d", hostPort), + NetNS: targetNS2.Path(), + IfName: "eth0", + CapabilityArgs: map[string]interface{}{ + "portMappings": []map[string]interface{}{ + { + "hostPort": hostPort, + "containerPort": containerPort, + "protocol": "udp", + }, + }, + }, + } + + // Make delete idempotent, so we can clean up on failure + net2Deleted := false + deleteNetwork2 := func() error { + if net2Deleted { + return nil + } + net2Deleted = true + return cniConf.DelNetworkList(context.TODO(), configList, &runtimeConfig2) + } + + // Create the network + resI2, err := cniConf.AddNetworkList(context.TODO(), configList, &runtimeConfig2) + Expect(err).NotTo(HaveOccurred()) + defer deleteNetwork2() + + result2, err := current.GetResult(resI2) + Expect(err).NotTo(HaveOccurred()) + var contIP2 net.IP + + for _, ip := range result2.IPs { + intfIndex := *ip.Interface + if result2.Interfaces[intfIndex].Sandbox == "" { + continue + } + contIP2 = ip.Address.IP + } + if contIP2 == nil { + Fail("could not determine container IP") + } + + fmt.Fprintf(GinkgoWriter, "Second container: hostIP: %s:%d, contIP: %s:%d\n", + hostIP, hostPort, contIP2, containerPort) + + // dump iptables-save output for debugging + cmd = exec.Command("iptables-save") + cmd.Stderr = GinkgoWriter + cmd.Stdout = GinkgoWriter + Expect(cmd.Run()).To(Succeed()) + + // dump ip routes output for debugging + cmd = exec.Command("ip", "route") + cmd.Stderr = GinkgoWriter + cmd.Stdout = GinkgoWriter + Expect(cmd.Run()).To(Succeed()) + + // dump ip addresses output for debugging + cmd = exec.Command("ip", "addr") + cmd.Stderr = GinkgoWriter + cmd.Stdout = GinkgoWriter + Expect(cmd.Run()).To(Succeed()) + + // Sanity check: verify that the container is reachable directly + fmt.Fprintln(GinkgoWriter, "Connect to container:", contIP2.String(), containerPort) + cont2OK := testEchoServer(contIP2.String(), "udp", containerPort, "") + + // Verify that a connection to the forwarded port works + fmt.Fprintln(GinkgoWriter, "Connect to host:", hostIP, hostPort) + dnat2OK := testEchoServer(hostIP, "udp", hostPort, "") + + // Cleanup + session2.Terminate() + err = deleteNetwork2() + Expect(err).NotTo(HaveOccurred()) + + // Check that everything succeeded *after* we clean up the network + if !cont2OK { + Fail("connection direct to " + contIP2.String() + " failed") + } + if !dnat2OK { + Fail("Connection to " + hostIP + " was not forwarded") + } + + close(done) }, TIMEOUT*9) }) }) // testEchoServer returns true if we found an echo server on the port -func testEchoServer(address string, port int, netns string) bool { +func testEchoServer(address, protocol string, port int, netns string) bool { message := "'Aliquid melius quam pessimum optimum non est.'" var cmd *exec.Cmd if netns != "" { netns = filepath.Base(netns) - cmd = exec.Command("ip", "netns", "exec", netns, echoClientBinaryPath, "--target", fmt.Sprintf("%s:%d", address, port), "--message", message) + cmd = exec.Command("ip", "netns", "exec", netns, echoClientBinaryPath, "--target", fmt.Sprintf("%s:%d", address, port), "--message", message, "--protocol", protocol) } else { - cmd = exec.Command(echoClientBinaryPath, "--target", fmt.Sprintf("%s:%d", address, port), "--message", message) + cmd = exec.Command(echoClientBinaryPath, "--target", fmt.Sprintf("%s:%d", address, port), "--message", message, "--protocol", protocol) } cmd.Stdin = bytes.NewBufferString(message) cmd.Stderr = GinkgoWriter