Use CNI cache to find container IP

This is an optimization that uses the results cache created by
CNI on the filesystem to store and fetch IP addresses for
containers in the core services and for functions. As part of
the change, the dependency on the syscall code from Weave net
has been removed, and the code should compile on MacOS again.

Updates and rebases the work in #38 by carlosedp

Tested in the original PR, further testing in the incoming
PR.

Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alexellis2@gmail.com>
This commit is contained in:
Alex Ellis (OpenFaaS Ltd) 2021-02-19 12:12:14 +00:00
parent e99c49d4e5
commit 19a807c336
7 changed files with 57 additions and 196 deletions

View File

@ -1,6 +1,7 @@
package cninetwork package cninetwork
import ( import (
"bufio"
"context" "context"
"fmt" "fmt"
"io" "io"
@ -10,6 +11,7 @@ import (
"os" "os"
"path" "path"
"path/filepath" "path/filepath"
"strings"
"github.com/containerd/containerd" "github.com/containerd/containerd"
gocni "github.com/containerd/go-cni" gocni "github.com/containerd/go-cni"
@ -19,21 +21,31 @@ import (
const ( const (
// CNIBinDir describes the directory where the CNI binaries are stored // CNIBinDir describes the directory where the CNI binaries are stored
CNIBinDir = "/opt/cni/bin" CNIBinDir = "/opt/cni/bin"
// CNIConfDir describes the directory where the CNI plugin's configuration is stored // CNIConfDir describes the directory where the CNI plugin's configuration is stored
CNIConfDir = "/etc/cni/net.d" CNIConfDir = "/etc/cni/net.d"
// NetNSPathFmt gives the path to the a process network namespace, given the pid // NetNSPathFmt gives the path to the a process network namespace, given the pid
NetNSPathFmt = "/proc/%d/ns/net" NetNSPathFmt = "/proc/%d/ns/net"
// CNIResultsDir is the directory CNI stores allocated IP for containers
CNIResultsDir = "/var/lib/cni/results" // CNIDataDir is the directory CNI stores allocated IP for containers
CNIDataDir = "/var/run/cni"
// defaultCNIConfFilename is the vanity filename of default CNI configuration file // defaultCNIConfFilename is the vanity filename of default CNI configuration file
defaultCNIConfFilename = "10-openfaas.conflist" defaultCNIConfFilename = "10-openfaas.conflist"
// defaultNetworkName names the "docker-bridge"-like CNI plugin-chain installed when no other CNI configuration is present. // defaultNetworkName names the "docker-bridge"-like CNI plugin-chain installed when no other CNI configuration is present.
// This value appears in iptables comments created by CNI. // This value appears in iptables comments created by CNI.
defaultNetworkName = "openfaas-cni-bridge" defaultNetworkName = "openfaas-cni-bridge"
// defaultBridgeName is the default bridge device name used in the defaultCNIConf // defaultBridgeName is the default bridge device name used in the defaultCNIConf
defaultBridgeName = "openfaas0" defaultBridgeName = "openfaas0"
// defaultSubnet is the default subnet used in the defaultCNIConf -- this value is set to not collide with common container networking subnets: // defaultSubnet is the default subnet used in the defaultCNIConf -- this value is set to not collide with common container networking subnets:
defaultSubnet = "10.62.0.0/16" defaultSubnet = "10.62.0.0/16"
// defaultIfPrefix is the interface name to be created in the container
defaultIfPrefix = "eth"
) )
// defaultCNIConf is a CNI configuration that enables network access to containers (docker-bridge style) // defaultCNIConf is a CNI configuration that enables network access to containers (docker-bridge style)
@ -50,6 +62,7 @@ var defaultCNIConf = fmt.Sprintf(`
"ipam": { "ipam": {
"type": "host-local", "type": "host-local",
"subnet": "%s", "subnet": "%s",
"dataDir": "%s",
"routes": [ "routes": [
{ "dst": "0.0.0.0/0" } { "dst": "0.0.0.0/0" }
] ]
@ -60,7 +73,7 @@ var defaultCNIConf = fmt.Sprintf(`
} }
] ]
} }
`, defaultNetworkName, defaultBridgeName, defaultSubnet) `, defaultNetworkName, defaultBridgeName, defaultSubnet, CNIDataDir)
// InitNetwork writes configlist file and initializes CNI network // InitNetwork writes configlist file and initializes CNI network
func InitNetwork() (gocni.CNI, error) { func InitNetwork() (gocni.CNI, error) {
@ -75,11 +88,14 @@ func InitNetwork() (gocni.CNI, error) {
netConfig := path.Join(CNIConfDir, defaultCNIConfFilename) netConfig := path.Join(CNIConfDir, defaultCNIConfFilename)
if err := ioutil.WriteFile(netConfig, []byte(defaultCNIConf), 644); err != nil { if err := ioutil.WriteFile(netConfig, []byte(defaultCNIConf), 644); err != nil {
return nil, fmt.Errorf("cannot write network config: %s", defaultCNIConfFilename) return nil, fmt.Errorf("cannot write network config: %s", defaultCNIConfFilename)
} }
// Initialize CNI library // Initialize CNI library
cni, err := gocni.New(gocni.WithPluginConfDir(CNIConfDir), cni, err := gocni.New(
gocni.WithPluginDir([]string{CNIBinDir})) gocni.WithPluginConfDir(CNIConfDir),
gocni.WithPluginDir([]string{CNIBinDir}),
gocni.WithInterfacePrefix(defaultIfPrefix),
)
if err != nil { if err != nil {
return nil, fmt.Errorf("error initializing cni: %s", err) return nil, fmt.Errorf("error initializing cni: %s", err)
@ -131,43 +147,33 @@ func DeleteCNINetwork(ctx context.Context, cni gocni.CNI, client *containerd.Cli
return errors.Wrapf(containerErr, "Unable to find container: %s, error: %s", name, containerErr) return errors.Wrapf(containerErr, "Unable to find container: %s, error: %s", name, containerErr)
} }
// GetIPAddress returns the IP address of the created container // GetIPAddress returns the IP address from container based on name and PID
func GetIPAddress(result *gocni.CNIResult, task containerd.Task) (net.IP, error) { func GetIPAddress(name string, PID uint32) (string, error) {
// Get the IP of the created interface processName := fmt.Sprintf("%s-%d", name, PID)
var ip net.IP CNIDir := path.Join(CNIDataDir, defaultNetworkName)
for ifName, config := range result.Interfaces {
if config.Sandbox == netNamespace(task) {
for _, ipConfig := range config.IPConfigs {
if ifName != "lo" && ipConfig.IP.To4() != nil {
ip = ipConfig.IP
}
}
}
}
if ip == nil {
return nil, fmt.Errorf("unable to get IP address for: %s", task.ID())
}
return ip, nil
}
func GetIPfromPID(pid int) (*net.IP, error) { files, err := ioutil.ReadDir(CNIDir)
// https://github.com/weaveworks/weave/blob/master/net/netdev.go
peerIDs, err := ConnectedToBridgeVethPeerIds(defaultBridgeName)
if err != nil { if err != nil {
return nil, fmt.Errorf("unable to find peers on: %s %s", defaultBridgeName, err) return "", fmt.Errorf("failed to read CNI dir for container %s: %v", name, err)
} }
addrs, addrsErr := GetNetDevsByVethPeerIds(pid, peerIDs) for _, file := range files {
if addrsErr != nil { f, err := os.Open(filepath.Join(CNIDir, file.Name()))
return nil, fmt.Errorf("unable to find address for veth pair using: %v %s", peerIDs, addrsErr) if err != nil {
return "", fmt.Errorf("failed to open CNI IP file for %s/%s: %v", CNIDir, file.Name(), err)
} }
reader := bufio.NewReader(f)
if len(addrs) > 0 && len(addrs[0].CIDRs) > 0 { text, _ := reader.ReadString('\n')
return &addrs[0].CIDRs[0].IP, nil if strings.Contains(text, processName) {
i, _ := reader.ReadString('\n')
if strings.Contains(i, defaultIfPrefix) {
f.Close()
return file.Name(), nil
} }
}
return nil, fmt.Errorf("no IP found for function") f.Close()
}
return "", fmt.Errorf("unable to get IP address for container %s", name)
} }
// CNIGateway returns the gateway for default subnet // CNIGateway returns the gateway for default subnet

View File

@ -1,118 +0,0 @@
// Copyright Weaveworks
// github.com/weaveworks/weave/net
package cninetwork
import (
"fmt"
"net"
"os"
"github.com/vishvananda/netlink"
"github.com/vishvananda/netns"
)
type Dev struct {
Name string `json:"Name,omitempty"`
MAC net.HardwareAddr `json:"MAC,omitempty"`
CIDRs []*net.IPNet `json:"CIDRs,omitempty"`
}
// ConnectedToBridgeVethPeerIds returns peer indexes of veth links connected to
// the given bridge. The peer index is used to query from a container netns
// whether the container is connected to the bridge.
func ConnectedToBridgeVethPeerIds(bridgeName string) ([]int, error) {
var ids []int
br, err := netlink.LinkByName(bridgeName)
if err != nil {
return nil, err
}
links, err := netlink.LinkList()
if err != nil {
return nil, err
}
for _, link := range links {
if _, isveth := link.(*netlink.Veth); isveth && link.Attrs().MasterIndex == br.Attrs().Index {
peerID := link.Attrs().ParentIndex
if peerID == 0 {
// perhaps running on an older kernel where ParentIndex doesn't work.
// as fall-back, assume the peers are consecutive
peerID = link.Attrs().Index - 1
}
ids = append(ids, peerID)
}
}
return ids, nil
}
// Lookup the weave interface of a container
func GetWeaveNetDevs(processID int) ([]Dev, error) {
peerIDs, err := ConnectedToBridgeVethPeerIds("weave")
if err != nil {
return nil, err
}
return GetNetDevsByVethPeerIds(processID, peerIDs)
}
func GetNetDevsByVethPeerIds(processID int, peerIDs []int) ([]Dev, error) {
// Bail out if this container is running in the root namespace
netnsRoot, err := netns.GetFromPid(1)
if err != nil {
return nil, fmt.Errorf("unable to open root namespace: %s", err)
}
defer netnsRoot.Close()
netnsContainer, err := netns.GetFromPid(processID)
if err != nil {
// Unable to find a namespace for this process - just return nothing
if os.IsNotExist(err) {
return nil, nil
}
return nil, fmt.Errorf("unable to open process %d namespace: %s", processID, err)
}
defer netnsContainer.Close()
if netnsRoot.Equal(netnsContainer) {
return nil, nil
}
// convert list of peerIDs into a map for faster lookup
indexes := make(map[int]struct{})
for _, id := range peerIDs {
indexes[id] = struct{}{}
}
var netdevs []Dev
err = WithNetNS(netnsContainer, func() error {
links, err := netlink.LinkList()
if err != nil {
return err
}
for _, link := range links {
if _, found := indexes[link.Attrs().Index]; found {
netdev, err := linkToNetDev(link)
if err != nil {
return err
}
netdevs = append(netdevs, netdev)
}
}
return nil
})
return netdevs, err
}
// Get the weave bridge interface.
// NB: Should be called from the root network namespace.
func GetBridgeNetDev(bridgeName string) (Dev, error) {
link, err := netlink.LinkByName(bridgeName)
if err != nil {
return Dev{}, err
}
return linkToNetDev(link)
}

View File

@ -1,10 +0,0 @@
// +build darwin
package cninetwork
import "github.com/vishvananda/netlink"
func linkToNetDev(link netlink.Link) (Dev, error) {
return Dev{}, nil
}

View File

@ -1,19 +0,0 @@
// +build linux
package cninetwork
import "github.com/vishvananda/netlink"
func linkToNetDev(link netlink.Link) (Dev, error) {
addrs, err := netlink.AddrList(link, netlink.FAMILY_V4)
if err != nil {
return Dev{}, err
}
netDev := Dev{Name: link.Attrs().Name, MAC: link.Attrs().HardwareAddr}
for _, addr := range addrs {
netDev.CIDRs = append(netDev.CIDRs, addr.IPNet)
}
return netDev, nil
}

View File

@ -199,17 +199,18 @@ func createTask(ctx context.Context, client *containerd.Client, container contai
log.Printf("Container ID: %s\tTask ID %s:\tTask PID: %d\t\n", name, task.ID(), task.Pid()) log.Printf("Container ID: %s\tTask ID %s:\tTask PID: %d\t\n", name, task.ID(), task.Pid())
labels := map[string]string{} labels := map[string]string{}
network, err := cninetwork.CreateCNINetwork(ctx, cni, task, labels) _, err := cninetwork.CreateCNINetwork(ctx, cni, task, labels)
if err != nil { if err != nil {
return err return err
} }
ip, err := cninetwork.GetIPAddress(network, task) ip, err := cninetwork.GetIPAddress(name, task.Pid())
if err != nil { if err != nil {
return err return err
} }
log.Printf("%s has IP: %s.\n", name, ip.String())
log.Printf("%s has IP: %s.\n", name, ip)
_, waitErr := task.Wait(ctx) _, waitErr := task.Wait(ctx)
if waitErr != nil { if waitErr != nil {

View File

@ -3,10 +3,11 @@ package handlers
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/opencontainers/runtime-spec/specs-go"
"log" "log"
"strings" "strings"
"github.com/opencontainers/runtime-spec/specs-go"
"github.com/containerd/containerd" "github.com/containerd/containerd"
"github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/namespaces"
"github.com/openfaas/faasd/pkg/cninetwork" "github.com/openfaas/faasd/pkg/cninetwork"
@ -106,11 +107,11 @@ func GetFunction(client *containerd.Client, name string) (Function, error) {
fn.pid = task.Pid() fn.pid = task.Pid()
// Get container IP address // Get container IP address
ip, err := cninetwork.GetIPfromPID(int(task.Pid())) ip, err := cninetwork.GetIPAddress(name, task.Pid())
if err != nil { if err != nil {
return Function{}, err return Function{}, err
} }
fn.IP = ip.String() fn.IP = ip
} }
} else { } else {
replicas = 0 replicas = 0

View File

@ -193,19 +193,19 @@ func (s *Supervisor) Start(svcs []Service) error {
} }
labels := map[string]string{} labels := map[string]string{}
network, err := cninetwork.CreateCNINetwork(ctx, s.cni, task, labels) _, err = cninetwork.CreateCNINetwork(ctx, s.cni, task, labels)
if err != nil { if err != nil {
log.Printf("Error creating CNI for %s: %s", svc.Name, err) log.Printf("Error creating CNI for %s: %s", svc.Name, err)
return err return err
} }
ip, err := cninetwork.GetIPAddress(network, task) ip, err := cninetwork.GetIPAddress(svc.Name, task.Pid())
if err != nil { if err != nil {
log.Printf("Error getting IP for %s: %s", svc.Name, err) log.Printf("Error getting IP for %s: %s", svc.Name, err)
return err return err
} }
log.Printf("%s has IP: %s\n", newContainer.ID(), ip.String()) log.Printf("%s has IP: %s\n", newContainer.ID(), ip)
hosts, err := ioutil.ReadFile("hosts") hosts, err := ioutil.ReadFile("hosts")
if err != nil { if err != nil {