Initial merge of faas-containerd

This patch completes part of the work in #20 by porting the code
for faas-containerd in-tree. When tested, I was able to deploy
and then remove figlet from the store on `x86_64`.

In a follow-up PR, duplication will be removed where possible
and consolidated with updated documentation.

Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alexellis2@gmail.com>
This commit is contained in:
Alex Ellis (OpenFaaS Ltd)
2020-01-21 13:12:44 +00:00
committed by Alex Ellis
parent cda1fe78b1
commit 42e9c91ee9
52 changed files with 5260 additions and 3 deletions

View File

@ -0,0 +1,35 @@
package config
import (
"time"
types "github.com/openfaas/faas-provider/types"
)
type ProviderConfig struct {
// Sock is the address of the containerd socket
Sock string
}
// ReadFromEnv loads the FaaSConfig and the Containerd specific config form the env variables
func ReadFromEnv(hasEnv types.HasEnv) (*types.FaaSConfig, *ProviderConfig, error) {
config, err := types.ReadConfig{}.Read(hasEnv)
if err != nil {
return nil, nil, err
}
serviceTimeout := types.ParseIntOrDurationValue(hasEnv.Getenv("service_timeout"), time.Second*60)
config.EnableHealth = true
config.ReadTimeout = serviceTimeout
config.WriteTimeout = serviceTimeout
port := types.ParseIntValue(hasEnv.Getenv("port"), 8081)
config.TCPPort = &port
providerConfig := &ProviderConfig{
Sock: types.ParseString(hasEnv.Getenv("sock"), "/run/containerd/containerd.sock"),
}
return config, providerConfig, nil
}

View File

@ -0,0 +1,107 @@
package config
import (
"strconv"
"testing"
)
type EnvBucket struct {
Items map[string]string
}
func NewEnvBucket() EnvBucket {
return EnvBucket{
Items: make(map[string]string),
}
}
func (e EnvBucket) Getenv(key string) string {
return e.Items[key]
}
func (e EnvBucket) Setenv(key string, value string) {
e.Items[key] = value
}
func Test_SetSockByEnv(t *testing.T) {
defaultSock := "/run/containerd/containerd.sock"
expectedSock := "/non/default/value.sock"
env := NewEnvBucket()
_, config, err := ReadFromEnv(env)
if err != nil {
t.Fatalf("unexpected error %s", err)
}
if config.Sock != defaultSock {
t.Fatalf("expected %q, got %q", defaultSock, config.Sock)
}
env.Setenv("sock", expectedSock)
_, config, err = ReadFromEnv(env)
if err != nil {
t.Fatalf("unexpected error %s", err)
}
if config.Sock != expectedSock {
t.Fatalf("expected %q, got %q", expectedSock, config.Sock)
}
}
func Test_SetServiceTimeout(t *testing.T) {
defaultTimeout := "1m0s"
env := NewEnvBucket()
config, _, err := ReadFromEnv(env)
if err != nil {
t.Fatalf("unexpected error %s", err)
}
if config.ReadTimeout.String() != defaultTimeout {
t.Fatalf("expected %q, got %q", defaultTimeout, config.ReadTimeout)
}
if config.WriteTimeout.String() != defaultTimeout {
t.Fatalf("expected %q, got %q", defaultTimeout, config.WriteTimeout)
}
newTimeout := "30s"
env.Setenv("service_timeout", newTimeout)
config, _, err = ReadFromEnv(env)
if err != nil {
t.Fatalf("unexpected error %s", err)
}
if config.ReadTimeout.String() != newTimeout {
t.Fatalf("expected %q, got %q", newTimeout, config.ReadTimeout)
}
if config.WriteTimeout.String() != newTimeout {
t.Fatalf("expected %q, got %q", newTimeout, config.WriteTimeout)
}
}
func Test_SetPort(t *testing.T) {
defaultPort := 8081
env := NewEnvBucket()
config, _, err := ReadFromEnv(env)
if err != nil {
t.Fatalf("unexpected error %s", err)
}
if config.TCPPort == nil {
t.Fatal("expected non-nil TCPPort")
}
if *config.TCPPort != defaultPort {
t.Fatalf("expected %d, got %d", defaultPort, config.TCPPort)
}
newPort := 9091
newPortStr := strconv.Itoa(newPort)
env.Setenv("port", newPortStr)
config, _, err = ReadFromEnv(env)
if err != nil {
t.Fatalf("unexpected error %s", err)
}
if config.TCPPort == nil {
t.Fatal("expected non-nil TCPPort")
}
if *config.TCPPort != newPort {
t.Fatalf("expected %d, got %d", newPort, config.TCPPort)
}
}

View File

@ -0,0 +1,212 @@
package handlers
import (
"context"
"fmt"
"io"
"io/ioutil"
"log"
"net"
"os"
"path"
"path/filepath"
"github.com/containerd/containerd"
gocni "github.com/containerd/go-cni"
"github.com/pkg/errors"
)
const (
// CNIBinDir describes the directory where the CNI binaries are stored
CNIBinDir = "/opt/cni/bin"
// CNIConfDir describes the directory where the CNI plugin's configuration is stored
CNIConfDir = "/etc/cni/net.d"
// NetNSPathFmt gives the path to the a process network namespace, given the pid
NetNSPathFmt = "/proc/%d/ns/net"
// defaultCNIConfFilename is the vanity filename of default CNI configuration file
defaultCNIConfFilename = "10-openfaas.conflist"
// defaultNetworkName names the "docker-bridge"-like CNI plugin-chain installed when no other CNI configuration is present.
// This value appears in iptables comments created by CNI.
defaultNetworkName = "openfaas-cni-bridge"
// defaultBridgeName is the default bridge device name used in the defaultCNIConf
defaultBridgeName = "openfaas0"
// defaultSubnet is the default subnet used in the defaultCNIConf -- this value is set to not collide with common container networking subnets:
defaultSubnet = "10.62.0.0/16"
)
// defaultCNIConf is a CNI configuration that enables network access to containers (docker-bridge style)
var defaultCNIConf = fmt.Sprintf(`
{
"cniVersion": "0.4.0",
"name": "%s",
"plugins": [
{
"type": "bridge",
"bridge": "%s",
"isGateway": true,
"ipMasq": true,
"ipam": {
"type": "host-local",
"subnet": "%s",
"routes": [
{ "dst": "0.0.0.0/0" }
]
}
},
{
"type": "firewall"
}
]
}
`, defaultNetworkName, defaultBridgeName, defaultSubnet)
// InitNetwork writes configlist file and initializes CNI network
func InitNetwork() (gocni.CNI, error) {
log.Printf("Writing network config...\n")
if !dirExists(CNIConfDir) {
if err := os.MkdirAll(CNIConfDir, 0755); err != nil {
return nil, fmt.Errorf("cannot create directory: %s", CNIConfDir)
}
}
netConfig := path.Join(CNIConfDir, defaultCNIConfFilename)
if err := ioutil.WriteFile(netConfig, []byte(defaultCNIConf), 644); err != nil {
return nil, fmt.Errorf("cannot write network config: %s", defaultCNIConfFilename)
}
// Initialize CNI library
cni, err := gocni.New(gocni.WithPluginConfDir(CNIConfDir),
gocni.WithPluginDir([]string{CNIBinDir}))
if err != nil {
return nil, fmt.Errorf("error initializing cni: %s", err)
}
// Load the cni configuration
if err := cni.Load(gocni.WithLoNetwork, gocni.WithConfListFile(filepath.Join(CNIConfDir, defaultCNIConfFilename))); err != nil {
return nil, fmt.Errorf("failed to load cni configuration: %v", err)
}
return cni, nil
}
// CreateCNINetwork creates a CNI network interface and attaches it to the context
func CreateCNINetwork(ctx context.Context, cni gocni.CNI, task containerd.Task, labels map[string]string) (*gocni.CNIResult, error) {
id := NetID(task)
netns := NetNamespace(task)
result, err := cni.Setup(ctx, id, netns, gocni.WithLabels(labels))
if err != nil {
return nil, errors.Wrapf(err, "Failed to setup network for task %q: %v", id, err)
}
return result, nil
}
// DeleteCNINetwork deletes a CNI network based on task ID and Pid
func DeleteCNINetwork(ctx context.Context, cni gocni.CNI, client *containerd.Client, name string) error {
container, containerErr := client.LoadContainer(ctx, name)
if containerErr == nil {
task, err := container.Task(ctx, nil)
if err != nil {
log.Printf("[Delete] unable to find task for container: %s\n", name)
return nil
}
log.Printf("[Delete] removing CNI network for: %s\n", task.ID())
id := NetID(task)
netns := NetNamespace(task)
if err := cni.Remove(ctx, id, netns); err != nil {
return errors.Wrapf(err, "Failed to remove network for task: %q, %v", id, err)
}
log.Printf("[Delete] removed: %s from namespace: %s, ID: %s\n", name, netns, id)
return nil
}
return errors.Wrapf(containerErr, "Unable to find container: %s, error: %s", name, containerErr)
}
// GetIPAddress returns the IP address of the created container
func GetIPAddress(result *gocni.CNIResult, task containerd.Task) (net.IP, error) {
// Get the IP of the created interface
var ip net.IP
for ifName, config := range result.Interfaces {
if config.Sandbox == NetNamespace(task) {
for _, ipConfig := range config.IPConfigs {
if ifName != "lo" && ipConfig.IP.To4() != nil {
ip = ipConfig.IP
}
}
}
}
if ip == nil {
return nil, fmt.Errorf("unable to get IP address for: %s", task.ID())
}
return ip, nil
}
func GetIPfromPID(pid int) (*net.IP, error) {
// https://github.com/weaveworks/weave/blob/master/net/netdev.go
peerIDs, err := ConnectedToBridgeVethPeerIds(defaultBridgeName)
if err != nil {
return nil, fmt.Errorf("unable to find peers on: %s %s", defaultBridgeName, err)
}
addrs, addrsErr := GetNetDevsByVethPeerIds(pid, peerIDs)
if addrsErr != nil {
return nil, fmt.Errorf("unable to find address for veth pair using: %v %s", peerIDs, addrsErr)
}
return &addrs[0].CIDRs[0].IP, nil
}
// NetID generates the network IF based on task name and task PID
func NetID(task containerd.Task) string {
return fmt.Sprintf("%s-%d", task.ID(), task.Pid())
}
// NetNamespace generates the namespace path based on task PID.
func NetNamespace(task containerd.Task) string {
return fmt.Sprintf(NetNSPathFmt, task.Pid())
}
func dirEmpty(dirname string) (isEmpty bool) {
if !dirExists(dirname) {
return
}
f, err := os.Open(dirname)
if err != nil {
return
}
defer func() { _ = f.Close() }()
// If the first file is EOF, the directory is empty
if _, err = f.Readdir(1); err == io.EOF {
isEmpty = true
}
return isEmpty
}
func dirExists(dirname string) bool {
exists, info := pathExists(dirname)
if !exists {
return false
}
return info.IsDir()
}
func pathExists(path string) (bool, os.FileInfo) {
info, err := os.Stat(path)
if os.IsNotExist(err) {
return false, nil
}
return true, info
}

View File

@ -0,0 +1,76 @@
package handlers
import (
"context"
"fmt"
"github.com/containerd/containerd"
"github.com/containerd/containerd/namespaces"
)
type Function struct {
name string
namespace string
image string
pid uint32
replicas int
IP string
}
const (
// FunctionNamespace is the containerd namespace functions are created
FunctionNamespace = "openfaas-fn"
)
// ListFunctions returns a map of all functions with running tasks on namespace
func ListFunctions(client *containerd.Client) (map[string]Function, error) {
ctx := namespaces.WithNamespace(context.Background(), FunctionNamespace)
functions := make(map[string]Function)
containers, _ := client.Containers(ctx)
for _, k := range containers {
name := k.ID()
functions[name], _ = GetFunction(client, name)
}
return functions, nil
}
// GetFunction returns a function that matches name
func GetFunction(client *containerd.Client, name string) (Function, error) {
ctx := namespaces.WithNamespace(context.Background(), FunctionNamespace)
c, err := client.LoadContainer(ctx, name)
if err == nil {
image, _ := c.Image(ctx)
f := Function{
name: c.ID(),
namespace: FunctionNamespace,
image: image.Name(),
}
replicas := 0
task, err := c.Task(ctx, nil)
if err == nil {
// Task for container exists
svc, err := task.Status(ctx)
if err != nil {
return Function{}, fmt.Errorf("unable to get task status for container: %s %s", name, err)
}
if svc.Status == "running" {
replicas = 1
f.pid = task.Pid()
// Get container IP address
ip, _ := GetIPfromPID(int(task.Pid()))
f.IP = ip.String()
}
} else {
replicas = 0
}
f.replicas = replicas
return f, nil
}
return Function{}, fmt.Errorf("unable to find function: %s, error %s", name, err)
}

View File

@ -0,0 +1,68 @@
package handlers
import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"github.com/alexellis/faasd/pkg/service"
"github.com/containerd/containerd"
"github.com/containerd/containerd/namespaces"
gocni "github.com/containerd/go-cni"
"github.com/openfaas/faas/gateway/requests"
)
func MakeDeleteHandler(client *containerd.Client, cni gocni.CNI) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
if r.Body == nil {
http.Error(w, "expected a body", http.StatusBadRequest)
return
}
defer r.Body.Close()
body, _ := ioutil.ReadAll(r.Body)
log.Printf("[Delete] request: %s\n", string(body))
req := requests.DeleteFunctionRequest{}
err := json.Unmarshal(body, &req)
if err != nil {
log.Printf("[Delete] error parsing input: %s\n", err)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
name := req.FunctionName
function, err := GetFunction(client, name)
if err != nil {
msg := fmt.Sprintf("service %s not found", name)
log.Printf("[Delete] %s\n", msg)
http.Error(w, msg, http.StatusNotFound)
return
}
ctx := namespaces.WithNamespace(context.Background(), FunctionNamespace)
if function.replicas != 0 {
err = DeleteCNINetwork(ctx, cni, client, name)
if err != nil {
log.Printf("[Delete] error removing CNI network for %s, %s\n", name, err)
}
}
containerErr := service.Remove(ctx, client, name)
if containerErr != nil {
log.Printf("[Delete] error removing %s, %s\n", name, containerErr)
http.Error(w, containerErr.Error(), http.StatusInternalServerError)
return
}
log.Printf("[Delete] deleted %s\n", name)
}
}

View File

@ -0,0 +1,179 @@
package handlers
import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"os"
"path"
"strings"
"github.com/alexellis/faasd/pkg/service"
"github.com/containerd/containerd"
"github.com/containerd/containerd/cio"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/oci"
gocni "github.com/containerd/go-cni"
"github.com/opencontainers/runtime-spec/specs-go"
"github.com/openfaas/faas-provider/types"
"github.com/pkg/errors"
)
func MakeDeployHandler(client *containerd.Client, cni gocni.CNI) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
if r.Body == nil {
http.Error(w, "expected a body", http.StatusBadRequest)
return
}
defer r.Body.Close()
body, _ := ioutil.ReadAll(r.Body)
log.Printf("[Deploy] request: %s\n", string(body))
req := types.FunctionDeployment{}
err := json.Unmarshal(body, &req)
if err != nil {
log.Printf("[Deploy] - error parsing input: %s\n", err)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
name := req.Service
ctx := namespaces.WithNamespace(context.Background(), FunctionNamespace)
deployErr := deploy(ctx, req, client, cni)
if deployErr != nil {
log.Printf("[Deploy] error deploying %s, error: %s\n", name, deployErr)
http.Error(w, deployErr.Error(), http.StatusBadRequest)
return
}
}
}
func deploy(ctx context.Context, req types.FunctionDeployment, client *containerd.Client, cni gocni.CNI) error {
imgRef := "docker.io/" + req.Image
if strings.Index(req.Image, ":") == -1 {
imgRef = imgRef + ":latest"
}
snapshotter := ""
if val, ok := os.LookupEnv("snapshotter"); ok {
snapshotter = val
}
image, err := service.PrepareImage(ctx, client, imgRef, snapshotter)
if err != nil {
return errors.Wrapf(err, "unable to pull image %s", imgRef)
}
size, _ := image.Size(ctx)
log.Printf("Deploy %s size: %d\n", image.Name(), size)
envs := prepareEnv(req.EnvProcess, req.EnvVars)
mounts := getMounts()
name := req.Service
container, err := client.NewContainer(
ctx,
name,
containerd.WithImage(image),
containerd.WithSnapshotter(snapshotter),
containerd.WithNewSnapshot(req.Service+"-snapshot", image),
containerd.WithNewSpec(oci.WithImageConfig(image),
oci.WithCapabilities([]string{"CAP_NET_RAW"}),
oci.WithMounts(mounts),
oci.WithEnv(envs)),
)
if err != nil {
return fmt.Errorf("unable to create container: %s, error: %s", name, err)
}
return createTask(ctx, client, container, cni)
}
func createTask(ctx context.Context, client *containerd.Client, container containerd.Container, cni gocni.CNI) error {
name := container.ID()
task, taskErr := container.NewTask(ctx, cio.NewCreator(cio.WithStdio))
if taskErr != nil {
return fmt.Errorf("unable to start task: %s, error: %s", name, taskErr)
}
log.Printf("Container ID: %s\tTask ID %s:\tTask PID: %d\t\n", name, task.ID(), task.Pid())
labels := map[string]string{}
network, err := CreateCNINetwork(ctx, cni, task, labels)
if err != nil {
return err
}
ip, err := GetIPAddress(network, task)
if err != nil {
return err
}
log.Printf("%s has IP: %s.\n", name, ip.String())
_, waitErr := task.Wait(ctx)
if waitErr != nil {
return errors.Wrapf(waitErr, "Unable to wait for task to start: %s", name)
}
if startErr := task.Start(ctx); startErr != nil {
return errors.Wrapf(startErr, "Unable to start task: %s", name)
}
return nil
}
func prepareEnv(envProcess string, reqEnvVars map[string]string) []string {
envs := []string{}
fprocessFound := false
fprocess := "fprocess=" + envProcess
if len(envProcess) > 0 {
fprocessFound = true
}
for k, v := range reqEnvVars {
if k == "fprocess" {
fprocessFound = true
fprocess = v
} else {
envs = append(envs, k+"="+v)
}
}
if fprocessFound {
envs = append(envs, fprocess)
}
return envs
}
func getMounts() []specs.Mount {
wd, _ := os.Getwd()
mounts := []specs.Mount{}
mounts = append(mounts, specs.Mount{
Destination: "/etc/resolv.conf",
Type: "bind",
Source: path.Join(wd, "resolv.conf"),
Options: []string{"rbind", "ro"},
})
mounts = append(mounts, specs.Mount{
Destination: "/etc/hosts",
Type: "bind",
Source: path.Join(wd, "hosts"),
Options: []string{"rbind", "ro"},
})
return mounts
}

View File

@ -0,0 +1,43 @@
package handlers
import (
"encoding/json"
"net/http"
"github.com/openfaas/faas-provider/types"
)
const (
//OrchestrationIdentifier identifier string for provider orchestration
OrchestrationIdentifier = "containerd"
//ProviderName name of the provider
ProviderName = "faas-containerd"
)
//MakeInfoHandler creates handler for /system/info endpoint
func MakeInfoHandler(version, sha string) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if r.Body != nil {
defer r.Body.Close()
}
infoResponse := types.InfoResponse{
Orchestration: OrchestrationIdentifier,
Provider: ProviderName,
Version: types.ProviderVersion{
Release: version,
SHA: sha,
},
}
jsonOut, marshalErr := json.Marshal(infoResponse)
if marshalErr != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write(jsonOut)
}
}

View File

@ -0,0 +1,39 @@
package handlers
import (
"encoding/json"
"github.com/openfaas/faas-provider/types"
"net/http/httptest"
"testing"
)
func Test_InfoHandler(t *testing.T) {
sha := "4b825dc642cb6eb9a060e54bf8d69288fbee4904"
version := "0.0.1"
handler := MakeInfoHandler(version, sha)
w := httptest.NewRecorder()
r := httptest.NewRequest("GET", "/", nil)
handler(w, r)
resp := types.InfoResponse{}
err := json.Unmarshal(w.Body.Bytes(), &resp)
if err != nil {
t.Fatalf("unexpected error unmarshalling the response")
}
if resp.Provider != ProviderName {
t.Fatalf("expected provider %q, got %q", ProviderName, resp.Provider)
}
if resp.Orchestration != OrchestrationIdentifier {
t.Fatalf("expected orchestration %q, got %q", OrchestrationIdentifier, resp.Orchestration)
}
if resp.Version.SHA != sha {
t.Fatalf("expected orchestration %q, got %q", sha, resp.Version.SHA)
}
if resp.Version.Release != version {
t.Fatalf("expected release %q, got %q", version, resp.Version.Release)
}
}

View File

@ -0,0 +1,39 @@
package handlers
import (
"fmt"
"log"
"net/url"
"github.com/containerd/containerd"
)
const watchdogPort = 8080
type InvokeResolver struct {
client *containerd.Client
}
func NewInvokeResolver(client *containerd.Client) *InvokeResolver {
return &InvokeResolver{client: client}
}
func (i *InvokeResolver) Resolve(functionName string) (url.URL, error) {
log.Printf("Resolve: %q\n", functionName)
function, err := GetFunction(i.client, functionName)
if err != nil {
return url.URL{}, fmt.Errorf("%s not found", functionName)
}
serviceIP := function.IP
urlStr := fmt.Sprintf("http://%s:%d", serviceIP, watchdogPort)
urlRes, err := url.Parse(urlStr)
if err != nil {
return url.URL{}, err
}
return *urlRes, nil
}

View File

@ -0,0 +1,66 @@
// +build go1.10
// Copyright Weaveworks
// github.com/weaveworks/weave/net
package handlers
import (
"errors"
"fmt"
"path/filepath"
"runtime"
"github.com/vishvananda/netlink"
"github.com/vishvananda/netns"
)
var ErrLinkNotFound = errors.New("Link not found")
func WithNetNS(ns netns.NsHandle, work func() error) error {
runtime.LockOSThread()
defer runtime.UnlockOSThread()
oldNs, err := netns.Get()
if err == nil {
defer oldNs.Close()
err = netns.Set(ns)
if err == nil {
defer netns.Set(oldNs)
err = work()
}
}
return err
}
func WithNetNSLink(ns netns.NsHandle, ifName string, work func(link netlink.Link) error) error {
return WithNetNS(ns, func() error {
link, err := netlink.LinkByName(ifName)
if err != nil {
if err.Error() == errors.New("Link not found").Error() {
return ErrLinkNotFound
}
return err
}
return work(link)
})
}
func WithNetNSByPath(path string, work func() error) error {
ns, err := netns.GetFromPath(path)
if err != nil {
return err
}
return WithNetNS(ns, work)
}
func NSPathByPid(pid int) string {
return NSPathByPidWithRoot("/", pid)
}
func NSPathByPidWithRoot(root string, pid int) string {
return filepath.Join(root, fmt.Sprintf("/proc/%d/ns/net", pid))
}

View File

@ -0,0 +1,38 @@
package handlers
import (
"encoding/json"
"log"
"net/http"
"github.com/containerd/containerd"
"github.com/openfaas/faas-provider/types"
)
func MakeReadHandler(client *containerd.Client) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
res := []types.FunctionStatus{}
funcs, err := ListFunctions(client)
if err != nil {
log.Printf("[Read] error listing functions. Error: %s\n", err)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
for _, function := range funcs {
res = append(res, types.FunctionStatus{
Name: function.name,
Image: function.image,
Replicas: uint64(function.replicas),
Namespace: function.namespace,
})
}
body, _ := json.Marshal(res)
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write(body)
}
}

View File

@ -0,0 +1,34 @@
package handlers
import (
"encoding/json"
"net/http"
"github.com/containerd/containerd"
"github.com/gorilla/mux"
"github.com/openfaas/faas-provider/types"
)
func MakeReplicaReaderHandler(client *containerd.Client) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
functionName := vars["name"]
if f, err := GetFunction(client, functionName); err == nil {
found := types.FunctionStatus{
Name: functionName,
AvailableReplicas: uint64(f.replicas),
Replicas: uint64(f.replicas),
Namespace: f.namespace,
}
functionBytes, _ := json.Marshal(found)
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write(functionBytes)
} else {
w.WriteHeader(http.StatusNotFound)
}
}
}

View File

@ -0,0 +1,102 @@
package handlers
import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"github.com/containerd/containerd"
"github.com/containerd/containerd/namespaces"
gocni "github.com/containerd/go-cni"
"github.com/openfaas/faas-provider/types"
)
func MakeReplicaUpdateHandler(client *containerd.Client, cni gocni.CNI) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
if r.Body == nil {
http.Error(w, "expected a body", http.StatusBadRequest)
return
}
defer r.Body.Close()
body, _ := ioutil.ReadAll(r.Body)
log.Printf("[Scale] request: %s\n", string(body))
req := types.ScaleServiceRequest{}
err := json.Unmarshal(body, &req)
if err != nil {
log.Printf("[Scale] error parsing input: %s\n", err)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
name := req.ServiceName
if _, err := GetFunction(client, name); err != nil {
msg := fmt.Sprintf("service %s not found", name)
log.Printf("[Scale] %s\n", msg)
http.Error(w, msg, http.StatusNotFound)
return
}
ctx := namespaces.WithNamespace(context.Background(), FunctionNamespace)
ctr, ctrErr := client.LoadContainer(ctx, name)
if ctrErr != nil {
msg := fmt.Sprintf("cannot load service %s, error: %s", name, ctrErr)
log.Printf("[Scale] %s\n", msg)
http.Error(w, msg, http.StatusNotFound)
return
}
taskExists := true
task, taskErr := ctr.Task(ctx, nil)
if taskErr != nil {
msg := fmt.Sprintf("cannot load task for service %s, error: %s", name, taskErr)
log.Printf("[Scale] %s\n", msg)
taskExists = false
}
if req.Replicas > 0 {
if taskExists {
if status, statusErr := task.Status(ctx); statusErr == nil {
if status.Status == containerd.Paused {
if resumeErr := task.Resume(ctx); resumeErr != nil {
log.Printf("[Scale] error resuming task %s, error: %s\n", name, resumeErr)
http.Error(w, resumeErr.Error(), http.StatusBadRequest)
}
}
}
} else {
deployErr := createTask(ctx, client, ctr, cni)
if deployErr != nil {
log.Printf("[Scale] error deploying %s, error: %s\n", name, deployErr)
http.Error(w, deployErr.Error(), http.StatusBadRequest)
return
}
return
}
} else {
if taskExists {
if status, statusErr := task.Status(ctx); statusErr == nil {
if status.Status == containerd.Running {
if pauseErr := task.Pause(ctx); pauseErr != nil {
log.Printf("[Scale] error pausing task %s, error: %s\n", name, pauseErr)
http.Error(w, pauseErr.Error(), http.StatusBadRequest)
}
}
}
}
}
}
}

View File

@ -0,0 +1,73 @@
package handlers
import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"github.com/alexellis/faasd/pkg/service"
"github.com/containerd/containerd"
"github.com/containerd/containerd/namespaces"
gocni "github.com/containerd/go-cni"
"github.com/openfaas/faas-provider/types"
)
func MakeUpdateHandler(client *containerd.Client, cni gocni.CNI) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
if r.Body == nil {
http.Error(w, "expected a body", http.StatusBadRequest)
return
}
defer r.Body.Close()
body, _ := ioutil.ReadAll(r.Body)
log.Printf("[Update] request: %s\n", string(body))
req := types.FunctionDeployment{}
err := json.Unmarshal(body, &req)
if err != nil {
log.Printf("[Update] error parsing input: %s\n", err)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
name := req.Service
function, err := GetFunction(client, name)
if err != nil {
msg := fmt.Sprintf("service %s not found", name)
log.Printf("[Update] %s\n", msg)
http.Error(w, msg, http.StatusNotFound)
return
}
ctx := namespaces.WithNamespace(context.Background(), FunctionNamespace)
if function.replicas != 0 {
err = DeleteCNINetwork(ctx, cni, client, name)
if err != nil {
log.Printf("[Update] error removing CNI network for %s, %s\n", name, err)
}
}
containerErr := service.Remove(ctx, client, name)
if containerErr != nil {
log.Printf("[Update] error removing %s, %s\n", name, containerErr)
http.Error(w, containerErr.Error(), http.StatusInternalServerError)
return
}
deployErr := deploy(ctx, req, client, cni)
if deployErr != nil {
log.Printf("[Update] error deploying %s, error: %s\n", name, deployErr)
http.Error(w, deployErr.Error(), http.StatusBadRequest)
return
}
}
}

View File

@ -0,0 +1,131 @@
// Copyright Weaveworks
// github.com/weaveworks/weave/net
package handlers
import (
"fmt"
"net"
"os"
"github.com/vishvananda/netlink"
"github.com/vishvananda/netns"
)
type Dev struct {
Name string `json:"Name,omitempty"`
MAC net.HardwareAddr `json:"MAC,omitempty"`
CIDRs []*net.IPNet `json:"CIDRs,omitempty"`
}
func linkToNetDev(link netlink.Link) (Dev, error) {
addrs, err := netlink.AddrList(link, netlink.FAMILY_V4)
if err != nil {
return Dev{}, err
}
netDev := Dev{Name: link.Attrs().Name, MAC: link.Attrs().HardwareAddr}
for _, addr := range addrs {
netDev.CIDRs = append(netDev.CIDRs, addr.IPNet)
}
return netDev, nil
}
// ConnectedToBridgeVethPeerIds returns peer indexes of veth links connected to
// the given bridge. The peer index is used to query from a container netns
// whether the container is connected to the bridge.
func ConnectedToBridgeVethPeerIds(bridgeName string) ([]int, error) {
var ids []int
br, err := netlink.LinkByName(bridgeName)
if err != nil {
return nil, err
}
links, err := netlink.LinkList()
if err != nil {
return nil, err
}
for _, link := range links {
if _, isveth := link.(*netlink.Veth); isveth && link.Attrs().MasterIndex == br.Attrs().Index {
peerID := link.Attrs().ParentIndex
if peerID == 0 {
// perhaps running on an older kernel where ParentIndex doesn't work.
// as fall-back, assume the peers are consecutive
peerID = link.Attrs().Index - 1
}
ids = append(ids, peerID)
}
}
return ids, nil
}
// Lookup the weave interface of a container
func GetWeaveNetDevs(processID int) ([]Dev, error) {
peerIDs, err := ConnectedToBridgeVethPeerIds("weave")
if err != nil {
return nil, err
}
return GetNetDevsByVethPeerIds(processID, peerIDs)
}
func GetNetDevsByVethPeerIds(processID int, peerIDs []int) ([]Dev, error) {
// Bail out if this container is running in the root namespace
netnsRoot, err := netns.GetFromPid(1)
if err != nil {
return nil, fmt.Errorf("unable to open root namespace: %s", err)
}
defer netnsRoot.Close()
netnsContainer, err := netns.GetFromPid(processID)
if err != nil {
// Unable to find a namespace for this process - just return nothing
if os.IsNotExist(err) {
return nil, nil
}
return nil, fmt.Errorf("unable to open process %d namespace: %s", processID, err)
}
defer netnsContainer.Close()
if netnsRoot.Equal(netnsContainer) {
return nil, nil
}
// convert list of peerIDs into a map for faster lookup
indexes := make(map[int]struct{})
for _, id := range peerIDs {
indexes[id] = struct{}{}
}
var netdevs []Dev
err = WithNetNS(netnsContainer, func() error {
links, err := netlink.LinkList()
if err != nil {
return err
}
for _, link := range links {
if _, found := indexes[link.Attrs().Index]; found {
netdev, err := linkToNetDev(link)
if err != nil {
return err
}
netdevs = append(netdevs, netdev)
}
}
return nil
})
return netdevs, err
}
// Get the weave bridge interface.
// NB: Should be called from the root network namespace.
func GetBridgeNetDev(bridgeName string) (Dev, error) {
link, err := netlink.LinkByName(bridgeName)
if err != nil {
return Dev{}, err
}
return linkToNetDev(link)
}