mirror of
https://github.com/openfaas/faasd.git
synced 2025-06-08 16:06:47 +00:00
354 lines
9.3 KiB
Go
354 lines
9.3 KiB
Go
package handlers
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"log"
|
|
"net/http"
|
|
"os"
|
|
"path"
|
|
"time"
|
|
|
|
"github.com/containerd/containerd"
|
|
"github.com/containerd/containerd/cio"
|
|
"github.com/containerd/containerd/containers"
|
|
"github.com/containerd/containerd/namespaces"
|
|
"github.com/containerd/containerd/oci"
|
|
gocni "github.com/containerd/go-cni"
|
|
"github.com/distribution/reference"
|
|
"github.com/opencontainers/runtime-spec/specs-go"
|
|
"github.com/openfaas/faas-provider/types"
|
|
cninetwork "github.com/openfaas/faasd/pkg/cninetwork"
|
|
"github.com/openfaas/faasd/pkg/service"
|
|
"github.com/pkg/errors"
|
|
"k8s.io/apimachinery/pkg/api/resource"
|
|
)
|
|
|
|
const annotationLabelPrefix = "com.openfaas.annotations."
|
|
|
|
// MakeDeployHandler returns a handler to deploy a function
|
|
func MakeDeployHandler(client *containerd.Client, cni gocni.CNI, secretMountPath string, alwaysPull bool) func(w http.ResponseWriter, r *http.Request) {
|
|
return func(w http.ResponseWriter, r *http.Request) {
|
|
|
|
if r.Body == nil {
|
|
http.Error(w, "expected a body", http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
defer r.Body.Close()
|
|
|
|
body, _ := io.ReadAll(r.Body)
|
|
|
|
req := types.FunctionDeployment{}
|
|
err := json.Unmarshal(body, &req)
|
|
if err != nil {
|
|
log.Printf("[Deploy] - error parsing input: %s", err)
|
|
http.Error(w, err.Error(), http.StatusBadRequest)
|
|
|
|
return
|
|
}
|
|
|
|
namespace := getRequestNamespace(req.Namespace)
|
|
|
|
// Check if namespace exists, and it has the openfaas label
|
|
valid, err := validNamespace(client.NamespaceService(), namespace)
|
|
|
|
if err != nil {
|
|
http.Error(w, err.Error(), http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
if !valid {
|
|
http.Error(w, "namespace not valid", http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
namespaceSecretMountPath := getNamespaceSecretMountPath(secretMountPath, namespace)
|
|
err = validateSecrets(namespaceSecretMountPath, req.Secrets)
|
|
if err != nil {
|
|
http.Error(w, err.Error(), http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
name := req.Service
|
|
ctx := namespaces.WithNamespace(context.Background(), namespace)
|
|
|
|
if err := preDeploy(client, 1); err != nil {
|
|
http.Error(w, err.Error(), http.StatusBadRequest)
|
|
log.Printf("[Deploy] error deploying %s, error: %s\n", name, err)
|
|
return
|
|
}
|
|
|
|
if err := deploy(ctx, req, client, cni, namespaceSecretMountPath, alwaysPull); err != nil {
|
|
log.Printf("[Deploy] error deploying %s, error: %s\n", name, err)
|
|
http.Error(w, err.Error(), http.StatusBadRequest)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// prepull is an optimization which means an image can be pulled before a deployment
|
|
// request, since a deployment request first deletes the active function before
|
|
// trying to deploy a new one.
|
|
func prepull(ctx context.Context, req types.FunctionDeployment, client *containerd.Client, alwaysPull bool) (containerd.Image, error) {
|
|
start := time.Now()
|
|
r, err := reference.ParseNormalizedNamed(req.Image)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
imgRef := reference.TagNameOnly(r).String()
|
|
|
|
snapshotter := ""
|
|
if val, ok := os.LookupEnv("snapshotter"); ok {
|
|
snapshotter = val
|
|
}
|
|
|
|
image, err := service.PrepareImage(ctx, client, imgRef, snapshotter, alwaysPull)
|
|
if err != nil {
|
|
return nil, errors.Wrapf(err, "unable to pull image %s", imgRef)
|
|
}
|
|
|
|
size, _ := image.Size(ctx)
|
|
log.Printf("Image for: %s size: %d, took: %fs\n", image.Name(), size, time.Since(start).Seconds())
|
|
|
|
return image, nil
|
|
}
|
|
|
|
func deploy(ctx context.Context, req types.FunctionDeployment, client *containerd.Client, cni gocni.CNI, secretMountPath string, alwaysPull bool) error {
|
|
|
|
snapshotter := ""
|
|
if val, ok := os.LookupEnv("snapshotter"); ok {
|
|
snapshotter = val
|
|
}
|
|
|
|
image, err := prepull(ctx, req, client, alwaysPull)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
envs := prepareEnv(req.EnvProcess, req.EnvVars)
|
|
mounts := getOSMounts()
|
|
|
|
for _, secret := range req.Secrets {
|
|
mounts = append(mounts, specs.Mount{
|
|
Destination: path.Join("/var/openfaas/secrets", secret),
|
|
Type: "bind",
|
|
Source: path.Join(secretMountPath, secret),
|
|
Options: []string{"rbind", "ro"},
|
|
})
|
|
}
|
|
|
|
name := req.Service
|
|
|
|
labels, err := buildLabels(&req)
|
|
if err != nil {
|
|
return fmt.Errorf("unable to apply labels to container: %s, error: %w", name, err)
|
|
}
|
|
|
|
var memory *specs.LinuxMemory
|
|
if req.Limits != nil && len(req.Limits.Memory) > 0 {
|
|
memory = &specs.LinuxMemory{}
|
|
|
|
qty, err := resource.ParseQuantity(req.Limits.Memory)
|
|
if err != nil {
|
|
log.Printf("error parsing (%q) as quantity: %s", req.Limits.Memory, err.Error())
|
|
}
|
|
v := qty.Value()
|
|
memory.Limit = &v
|
|
}
|
|
|
|
container, err := client.NewContainer(
|
|
ctx,
|
|
name,
|
|
containerd.WithImage(image),
|
|
containerd.WithSnapshotter(snapshotter),
|
|
containerd.WithNewSnapshot(name+"-snapshot", image),
|
|
containerd.WithNewSpec(oci.WithImageConfig(image),
|
|
oci.WithHostname(name),
|
|
oci.WithCapabilities([]string{"CAP_NET_RAW"}),
|
|
oci.WithMounts(mounts),
|
|
oci.WithEnv(envs),
|
|
withMemory(memory)),
|
|
containerd.WithContainerLabels(labels),
|
|
)
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("unable to create container: %s, error: %w", name, err)
|
|
}
|
|
|
|
return createTask(ctx, container, cni)
|
|
|
|
}
|
|
|
|
// countFunctions returns the number of functions deployed along with a map with a count
|
|
// in each namespace
|
|
func countFunctions(client *containerd.Client) (int64, int64, error) {
|
|
count := int64(0)
|
|
namespaceCount := int64(0)
|
|
|
|
namespaces := ListNamespaces(client)
|
|
|
|
for _, namespace := range namespaces {
|
|
fns, err := ListFunctions(client, namespace)
|
|
if err != nil {
|
|
return 0, 0, err
|
|
}
|
|
namespaceCount++
|
|
count += int64(len(fns))
|
|
}
|
|
|
|
return count, namespaceCount, nil
|
|
}
|
|
|
|
func buildLabels(request *types.FunctionDeployment) (map[string]string, error) {
|
|
labels := map[string]string{}
|
|
|
|
if request.Labels != nil {
|
|
for k, v := range *request.Labels {
|
|
labels[k] = v
|
|
}
|
|
}
|
|
|
|
if request.Annotations != nil {
|
|
for k, v := range *request.Annotations {
|
|
key := fmt.Sprintf("%s%s", annotationLabelPrefix, k)
|
|
if _, ok := labels[key]; !ok {
|
|
labels[key] = v
|
|
} else {
|
|
return nil, errors.New(fmt.Sprintf("Key %s cannot be used as a label due to a conflict with annotation prefix %s", k, annotationLabelPrefix))
|
|
}
|
|
}
|
|
}
|
|
|
|
return labels, nil
|
|
}
|
|
|
|
func createTask(ctx context.Context, container containerd.Container, cni gocni.CNI) error {
|
|
|
|
name := container.ID()
|
|
|
|
task, taskErr := container.NewTask(ctx, cio.BinaryIO("/usr/local/bin/faasd", nil))
|
|
|
|
if taskErr != nil {
|
|
return fmt.Errorf("unable to start task: %s, error: %w", name, taskErr)
|
|
}
|
|
|
|
log.Printf("Container ID: %s\tTask ID %s:\tTask PID: %d\t\n", name, task.ID(), task.Pid())
|
|
|
|
labels := map[string]string{}
|
|
_, err := cninetwork.CreateCNINetwork(ctx, cni, task, labels)
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
ip, err := cninetwork.GetIPAddress(name, task.Pid())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
log.Printf("%s has IP: %s.\n", name, ip)
|
|
|
|
if _, err := task.Wait(ctx); err != nil {
|
|
return errors.Wrapf(err, "Unable to wait for task to start: %s", name)
|
|
}
|
|
|
|
if startErr := task.Start(ctx); startErr != nil {
|
|
return errors.Wrapf(startErr, "Unable to start task: %s", name)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func prepareEnv(envProcess string, reqEnvVars map[string]string) []string {
|
|
envs := []string{}
|
|
fprocessFound := false
|
|
fprocess := "fprocess=" + envProcess
|
|
if len(envProcess) > 0 {
|
|
fprocessFound = true
|
|
}
|
|
|
|
for k, v := range reqEnvVars {
|
|
if k == "fprocess" {
|
|
fprocessFound = true
|
|
fprocess = v
|
|
} else {
|
|
envs = append(envs, k+"="+v)
|
|
}
|
|
}
|
|
if fprocessFound {
|
|
envs = append(envs, fprocess)
|
|
}
|
|
return envs
|
|
}
|
|
|
|
// getOSMounts provides a mount for os-specific files such
|
|
// as the hosts file and resolv.conf
|
|
func getOSMounts() []specs.Mount {
|
|
// Prior to hosts_dir env-var, this value was set to
|
|
// os.Getwd()
|
|
hostsDir := "/var/lib/faasd"
|
|
if v, ok := os.LookupEnv("hosts_dir"); ok && len(v) > 0 {
|
|
hostsDir = v
|
|
}
|
|
|
|
mounts := []specs.Mount{}
|
|
mounts = append(mounts, specs.Mount{
|
|
Destination: "/etc/resolv.conf",
|
|
Type: "bind",
|
|
Source: path.Join(hostsDir, "resolv.conf"),
|
|
Options: []string{"rbind", "ro"},
|
|
})
|
|
|
|
mounts = append(mounts, specs.Mount{
|
|
Destination: "/etc/hosts",
|
|
Type: "bind",
|
|
Source: path.Join(hostsDir, "hosts"),
|
|
Options: []string{"rbind", "ro"},
|
|
})
|
|
return mounts
|
|
}
|
|
|
|
func validateSecrets(secretMountPath string, secrets []string) error {
|
|
for _, secret := range secrets {
|
|
if _, err := os.Stat(path.Join(secretMountPath, secret)); err != nil {
|
|
return fmt.Errorf("unable to find secret: %s", secret)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func withMemory(mem *specs.LinuxMemory) oci.SpecOpts {
|
|
return func(ctx context.Context, _ oci.Client, c *containers.Container, s *oci.Spec) error {
|
|
if mem != nil {
|
|
if s.Linux == nil {
|
|
s.Linux = &specs.Linux{}
|
|
}
|
|
if s.Linux.Resources == nil {
|
|
s.Linux.Resources = &specs.LinuxResources{}
|
|
}
|
|
if s.Linux.Resources.Memory == nil {
|
|
s.Linux.Resources.Memory = &specs.LinuxMemory{}
|
|
}
|
|
s.Linux.Resources.Memory.Limit = mem.Limit
|
|
}
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func preDeploy(client *containerd.Client, additional int64) error {
|
|
count, countNs, err := countFunctions(client)
|
|
log.Printf("Function count: %d, Namespace count: %d\n", count, countNs)
|
|
|
|
if err != nil {
|
|
return err
|
|
} else if count+additional > faasdMaxFunctions {
|
|
return fmt.Errorf("the OpenFaaS CE EULA allows %d/%d function(s), upgrade to faasd Pro to continue", faasdMaxFunctions, count+additional)
|
|
} else if countNs > faasdMaxNs {
|
|
return fmt.Errorf("the OpenFaaS CE EULA allows %d/%d namespace(s), upgrade to faasd Pro to continue", faasdMaxNs, countNs)
|
|
}
|
|
return nil
|
|
}
|