Compare commits

...

9 Commits
0.1.3 ... 0.2.0

Author SHA1 Message Date
3ee52c6ed7 Remove tasks and containers on SIGINT/SIGTERM
* Cleans-up and removes faasd containers/tasks when receiving
SIGINT/SIGTERM

Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alexellis2@gmail.com>
2019-12-24 10:09:56 +00:00
da16bdeee8 Extract services to method
Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alexellis2@gmail.com>
2019-12-24 09:58:24 +00:00
ad97b6db58 Add systemd utility package
Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alexellis2@gmail.com>
2019-12-24 09:12:34 +00:00
5bb68e15f5 Check for dependencies before installing
Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alexellis2@gmail.com>
2019-12-24 09:01:34 +00:00
0662605756 Bump latest gateway / queue-worker versions
Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alexellis2@gmail.com>
2019-12-23 21:18:06 +00:00
abc41d2108 Update queue-worker to print body
Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alexellis2@gmail.com>
2019-12-23 20:53:11 +00:00
9ba64783f9 Enable NATS, speed-up restart
Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alexellis2@gmail.com>
2019-12-23 20:43:00 +00:00
95e7f52a4f Set build version in Makefile
Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alexellis2@gmail.com>
2019-12-23 18:59:34 +00:00
8da2af9d96 Add Travis Build Status Badge
Signed-off-by: Utsav Anand <utsavanand2@gmail.com>
2019-12-23 18:25:46 +00:00
13 changed files with 430 additions and 113 deletions

1
.gitignore vendored
View File

@ -1,2 +1,3 @@
/faasd
hosts
/resolv.conf

2
Gopkg.lock generated
View File

@ -440,6 +440,7 @@
analyzer-name = "dep"
analyzer-version = 1
input-imports = [
"github.com/alexellis/go-execute",
"github.com/containerd/containerd",
"github.com/containerd/containerd/cio",
"github.com/containerd/containerd/containers",
@ -451,6 +452,7 @@
"github.com/spf13/cobra",
"github.com/vishvananda/netlink",
"github.com/vishvananda/netns",
"golang.org/x/sys/unix",
]
solver-name = "gps-cdcl"
solver-version = 1

View File

@ -1,30 +1,3 @@
# Gopkg.toml example
#
# Refer to https://golang.github.io/dep/docs/Gopkg.toml.html
# for detailed Gopkg.toml documentation.
#
# required = ["github.com/user/thing/cmd/thing"]
# ignored = ["github.com/user/project/pkgX", "bitbucket.org/user/project/pkgA/pkgY"]
#
# [[constraint]]
# name = "github.com/user/project"
# version = "1.0.0"
#
# [[constraint]]
# name = "github.com/user/project2"
# branch = "dev"
# source = "github.com/myfork/project2"
#
# [[override]]
# name = "github.com/x/y"
# version = "2.4.0"
#
# [prune]
# non-go = false
# go-tests = true
# unused-packages = true
[[constraint]]
name = "github.com/containerd/containerd"
version = "1.3.2"
@ -37,6 +10,11 @@
name = "github.com/spf13/cobra"
version = "0.0.5"
[[constraint]]
name = "github.com/alexellis/go-execute"
version = "0.3.0"
[prune]
go-tests = true
unused-packages = true

View File

@ -1,6 +1,6 @@
Version := $(shell git describe --tags --dirty)
GitCommit := $(shell git rev-parse HEAD)
LDFLAGS := "-s -w -X main.Version=$(Version) -X main.GitCommit=$(GitCommit)"
LDFLAGS := "-s -w -X pkg.Version=$(Version) -X pkg.GitCommit=$(GitCommit)"
.PHONY: all
all: local
@ -11,5 +11,5 @@ local:
.PHONY: dist
dist:
CGO_ENABLED=0 GOOS=linux go build -ldflags $(LDFLAGS) -a -installsuffix cgo -o bin/faasd
CGO_ENABLED=0 GOOS=linux GOARCH=arm GOARM=6 go build -ldflags $(LDFLAGS) -a -installsuffix cgo -o bin/faasd-armhf
CGO_ENABLED=0 GOOS=linux GOARCH=arm GOARM=7 go build -ldflags $(LDFLAGS) -a -installsuffix cgo -o bin/faasd-armhf
CGO_ENABLED=0 GOOS=linux GOARCH=arm64 go build -ldflags $(LDFLAGS) -a -installsuffix cgo -o bin/faasd-arm64

View File

@ -1,5 +1,7 @@
# faasd - serverless with containerd
[![Build Status](https://travis-ci.com/alexellis/faasd.svg?branch=master)](https://travis-ci.com/alexellis/faasd)
faasd is a Golang supervisor that bundles OpenFaaS for use with containerd instead of a container orchestrator like Kubernetes or Docker Swarm.
## About faasd:
@ -58,6 +60,29 @@ cd $GOPATH/src/github.com/alexellis/faasd
go build && sudo ./faasd
```
Or get from binaries:
### Build and run faas-containerd
```sh
# For x86_64
sudo curl -fSLs "https://github.com/alexellis/faasd/releases/download/0.1.3/faasd" \
-o "/usr/local/bin/faasd" \
&& sudo chmod a+x "/usr/local/bin/faasd"
# armhf
sudo curl -fSLs "https://github.com/alexellis/faasd/releases/download/0.1.3/faasd-armhf" \
-o "/usr/local/bin/faasd" \
&& sudo chmod a+x "/usr/local/bin/faasd"
# arm64
sudo curl -fSLs "https://github.com/alexellis/faasd/releases/download/0.1.3/faasd-arm64" \
-o "/usr/local/bin/faasd" \
&& sudo chmod a+x "/usr/local/bin/faasd"
```
Look in `hosts` in the current working folder to get the IP for the gateway or Prometheus
```sh
@ -81,3 +106,15 @@ echo faas-containerd gateway prometheus |xargs sudo ctr container rm
echo faas-containerd gateway prometheus |xargs sudo ctr snapshot rm
```
## Links
https://github.com/renatofq/ctrofb/blob/31968e4b4893f3603e9998f21933c4131523bb5d/cmd/network.go
https://github.com/renatofq/catraia/blob/c4f62c86bddbfadbead38cd2bfe6d920fba26dce/catraia-net/network.go
https://github.com/containernetworking/plugins
https://github.com/containerd/go-cni

75
cmd/install.go Normal file
View File

@ -0,0 +1,75 @@
package cmd
import (
"fmt"
"os"
"path"
systemd "github.com/alexellis/faasd/pkg/systemd"
"github.com/spf13/cobra"
)
var installCmd = &cobra.Command{
Use: "install",
Short: "Install faasd",
RunE: runInstall,
}
func runInstall(_ *cobra.Command, _ []string) error {
err := binExists("/usr/local/bin/", "faas-containerd")
if err != nil {
return err
}
err = binExists("/usr/local/bin/", "netns")
if err != nil {
return err
}
err = systemd.InstallUnit("faas-containerd")
if err != nil {
return err
}
err = systemd.InstallUnit("faasd")
if err != nil {
return err
}
err = systemd.DaemonReload()
if err != nil {
return err
}
err = systemd.Enable("faas-containerd")
if err != nil {
return err
}
err = systemd.Enable("faasd")
if err != nil {
return err
}
err = systemd.Start("faas-containerd")
if err != nil {
return err
}
err = systemd.Start("faasd")
if err != nil {
return err
}
return nil
}
func binExists(folder, name string) error {
findPath := path.Join(folder, name)
if _, err := os.Stat(findPath); err != nil {
return fmt.Errorf("unable to stat %s, install this binary before continuing", findPath)
}
return nil
}

View File

@ -22,6 +22,7 @@ const WelcomeMessage = "Welcome to faasd"
func init() {
rootCommand.AddCommand(versionCmd)
rootCommand.AddCommand(upCmd)
rootCommand.AddCommand(installCmd)
}
var rootCommand = &cobra.Command{

117
cmd/up.go
View File

@ -1,9 +1,13 @@
package cmd
import (
"fmt"
"log"
"os"
"os/signal"
"path"
"sync"
"syscall"
"time"
"github.com/alexellis/faasd/pkg"
@ -18,20 +22,64 @@ var upCmd = &cobra.Command{
func runUp(_ *cobra.Command, _ []string) error {
services := makeServiceDefinitions()
start := time.Now()
supervisor, err := pkg.NewSupervisor("/run/containerd/containerd.sock")
if err != nil {
return err
}
log.Printf("Supervisor created in: %s\n", time.Since(start).String())
start = time.Now()
err = supervisor.Start(services)
if err != nil {
return err
}
defer supervisor.Close()
log.Printf("Supervisor init done in: %s\n", time.Since(start).String())
shutdownTimeout := time.Second * 1
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGTERM, syscall.SIGINT)
log.Printf("faasd: waiting for SIGTERM or SIGINT\n")
<-sig
log.Printf("Signal received.. shutting down server in %s\n", shutdownTimeout.String())
err := supervisor.Remove(services)
if err != nil {
fmt.Println(err)
}
time.AfterFunc(shutdownTimeout, func() {
wg.Done()
})
}()
wg.Wait()
return nil
}
func makeServiceDefinitions() []pkg.Service {
wd, _ := os.Getwd()
svcs := []pkg.Service{
// pkg.Service{
// Name: "faas-containerd",
// Env: []string{"snapshotter=overlayfs"},
// Image: "docker.io/alexellis2/faas-containerd:0.3.2",
// Mounts: []pkg.Mount{
// pkg.Mount{
// Src: "/run/containerd/containerd.sock",
// Dest: "/run/containerd/containerd.sock",
// },
// },
// Caps: []string{"CAP_SYS_ADMIN", "CAP_NET_RAW"},
// },
return []pkg.Service{
pkg.Service{
Name: "nats",
Env: []string{""},
Image: "docker.io/library/nats-streaming:0.11.2",
Caps: []string{},
Args: []string{"/nats-streaming-server", "-m", "8222", "--store=memory", "--cluster_id=faas-cluster"},
},
pkg.Service{
Name: "prometheus",
Env: []string{},
@ -53,34 +101,27 @@ func runUp(_ *cobra.Command, _ []string) error {
"read_timeout=60s",
"write_timeout=60s",
"upstream_timeout=65s",
"faas_nats_address=nats",
"faas_nats_port=4222",
},
Image: "docker.io/openfaas/gateway:0.17.4",
Image: "docker.io/openfaas/gateway:0.18.7",
Mounts: []pkg.Mount{},
Caps: []string{"CAP_NET_RAW"},
},
pkg.Service{
Name: "queue-worker",
Env: []string{
"faas_nats_address=nats",
"faas_nats_port=4222",
"gateway_invoke=true",
"faas_gateway_address=gateway",
"ack_wait=5m5s",
"max_inflight=1",
"faas_print_body=true",
},
Image: "docker.io/openfaas/queue-worker:0.9.0",
Mounts: []pkg.Mount{},
Caps: []string{"CAP_NET_RAW"},
},
}
start := time.Now()
supervisor, err := pkg.NewSupervisor("/run/containerd/containerd.sock")
if err != nil {
return err
}
log.Printf("Supervisor created in: %s\n", time.Since(start).String())
start = time.Now()
err = supervisor.Start(svcs)
if err != nil {
return err
}
defer supervisor.Close()
log.Printf("Supervisor init done in: %s\n", time.Since(start).String())
time.Sleep(time.Minute * 120)
return nil
}

View File

@ -0,0 +1,12 @@
[Unit]
Description=faasd-containerd
[Service]
MemoryLimit=500M
ExecStart=/usr/local/bin/faas-containerd
Restart=on-failure
RestartSec=10s
WorkingDirectory=/usr/local/bin/
[Install]
WantedBy=multi-user.target

13
hack/faasd.service Normal file
View File

@ -0,0 +1,13 @@
[Unit]
Description=faasd
After=faas-containerd.service
[Service]
MemoryLimit=500M
ExecStart={{.Cwd}}/faasd up
Restart=on-failure
RestartSec=10s
WorkingDirectory={{.Cwd}}
[Install]
WantedBy=multi-user.target

View File

@ -8,7 +8,7 @@ import (
"os"
"os/exec"
"path"
"syscall"
"sync"
"time"
"github.com/alexellis/faasd/pkg/weave"
@ -16,6 +16,7 @@ import (
"github.com/containerd/containerd/cio"
"github.com/containerd/containerd/containers"
"github.com/containerd/containerd/errdefs"
"golang.org/x/sys/unix"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/oci"
@ -43,6 +44,18 @@ func (s *Supervisor) Close() {
defer s.client.Close()
}
func (s *Supervisor) Remove(svcs []Service) error {
ctx := namespaces.WithNamespace(context.Background(), "default")
for _, svc := range svcs {
err := removeContainer(ctx, s.client, svc.Name)
if err != nil {
return err
}
}
return nil
}
func (s *Supervisor) Start(svcs []Service) error {
ctx := namespaces.WithNamespace(context.Background(), "default")
@ -60,7 +73,7 @@ func (s *Supervisor) Start(svcs []Service) error {
images := map[string]containerd.Image{}
for _, svc := range svcs {
fmt.Printf("Preparing: %s", svc.Name)
fmt.Printf("Preparing: %s with image: %s\n", svc.Name, svc.Image)
img, err := prepareImage(ctx, s.client, svc.Image)
if err != nil {
@ -69,7 +82,6 @@ func (s *Supervisor) Start(svcs []Service) error {
images[svc.Name] = img
size, _ := img.Size(ctx)
fmt.Printf("Prepare done for: %s, %d bytes\n", svc.Image, size)
}
for _, svc := range svcs {
@ -77,44 +89,9 @@ func (s *Supervisor) Start(svcs []Service) error {
image := images[svc.Name]
container, containerErr := s.client.LoadContainer(ctx, svc.Name)
if containerErr == nil {
found := true
t, err := container.Task(ctx, nil)
if err != nil {
if errdefs.IsNotFound(err) {
found = false
} else {
return fmt.Errorf("unable to get task %s: ", err)
}
}
if found {
status, _ := t.Status(ctx)
fmt.Println("Status:", status.Status)
if status.Status == containerd.Running {
log.Println("need to kill", svc.Name)
err = t.Kill(ctx, syscall.SIGTERM)
if err != nil {
return fmt.Errorf("error killing task %s, %s, %s", container.ID(), svc.Name, err)
}
time.Sleep(5 * time.Second)
}
_, err = t.Delete(ctx)
if err != nil {
return fmt.Errorf("error deleting task %s, %s, %s", container.ID(), svc.Name, err)
}
}
err = container.Delete(ctx, containerd.WithSnapshotCleanup)
if err != nil {
return fmt.Errorf("error deleting container %s, %s, %s", container.ID(), svc.Name, err)
}
containerErr := removeContainer(ctx, s.client, svc.Name)
if containerErr != nil {
return containerErr
}
mounts := []specs.Mount{}
@ -173,6 +150,7 @@ func (s *Supervisor) Start(svcs []Service) error {
containerd.WithNewSpec(oci.WithImageConfig(image),
oci.WithCapabilities(svc.Caps),
oci.WithMounts(mounts),
withOCIArgs(svc.Args),
hook,
oci.WithEnv(svc.Env)),
)
@ -182,7 +160,7 @@ func (s *Supervisor) Start(svcs []Service) error {
return containerCreateErr
}
fmt.Println("created", newContainer.ID())
log.Printf("Created container %s\n", newContainer.ID())
task, err := newContainer.NewTask(ctx, cio.NewCreator(cio.WithStdio))
if err != nil {
@ -190,7 +168,7 @@ func (s *Supervisor) Start(svcs []Service) error {
return err
}
ip := getIP(container.ID(), task.Pid())
ip := getIP(newContainer.ID(), task.Pid())
hosts, _ := ioutil.ReadFile("hosts")
@ -200,7 +178,7 @@ func (s *Supervisor) Start(svcs []Service) error {
writeErr := ioutil.WriteFile("hosts", hosts, 0644)
if writeErr != nil {
fmt.Println("Error writing hosts file")
log.Println("Error writing hosts file")
}
// os.Chown("hosts", 101, 101)
@ -277,9 +255,89 @@ type Service struct {
Name string
Mounts []Mount
Caps []string
Args []string
}
type Mount struct {
Src string
Dest string
}
func withOCIArgs(args []string) oci.SpecOpts {
if len(args) > 0 {
return oci.WithProcessArgs(args...)
}
return func(_ context.Context, _ oci.Client, _ *containers.Container, s *oci.Spec) error {
return nil
}
}
// From Stellar
func killTask(ctx context.Context, task containerd.Task) error {
wg := &sync.WaitGroup{}
wg.Add(1)
var err error
go func() {
defer wg.Done()
if task != nil {
wait, err := task.Wait(ctx)
if err != nil {
err = fmt.Errorf("error waiting on task: %s", err)
return
}
if err := task.Kill(ctx, unix.SIGTERM, containerd.WithKillAll); err != nil {
log.Printf("error killing container task: %s", err)
}
select {
case <-wait:
task.Delete(ctx)
return
case <-time.After(5 * time.Second):
if err := task.Kill(ctx, unix.SIGKILL, containerd.WithKillAll); err != nil {
log.Printf("error force killing container task: %s", err)
}
return
}
}
}()
wg.Wait()
return err
}
func removeContainer(ctx context.Context, client *containerd.Client, name string) error {
container, containerErr := client.LoadContainer(ctx, name)
if containerErr == nil {
found := true
t, err := container.Task(ctx, nil)
if err != nil {
if errdefs.IsNotFound(err) {
found = false
} else {
return fmt.Errorf("unable to get task %s: ", err)
}
}
if found {
status, _ := t.Status(ctx)
fmt.Printf("Status of %s is: %s\n", name, status.Status)
log.Printf("Need to kill %s\n", name)
err := killTask(ctx, t)
if err != nil {
return fmt.Errorf("error killing task %s, %s, %s", container.ID(), name, err)
}
}
err = container.Delete(ctx, containerd.WithSnapshotCleanup)
if err != nil {
return fmt.Errorf("error deleting container %s, %s, %s", container.ID(), name, err)
}
}
return nil
}

100
pkg/systemd/systemd.go Normal file
View File

@ -0,0 +1,100 @@
package systemd
import (
"bytes"
"fmt"
"os"
"path/filepath"
"text/template"
execute "github.com/alexellis/go-execute/pkg/v1"
)
func Enable(unit string) error {
task := execute.ExecTask{Command: "systemctl",
Args: []string{"enable", unit},
StreamStdio: false,
}
res, err := task.Execute()
if err != nil {
return err
}
if res.ExitCode != 0 {
return fmt.Errorf("error executing task %s %v, stderr: %s", task.Command, task.Args, res.Stderr)
}
return nil
}
func Start(unit string) error {
task := execute.ExecTask{Command: "systemctl",
Args: []string{"start", unit},
StreamStdio: false,
}
res, err := task.Execute()
if err != nil {
return err
}
if res.ExitCode != 0 {
return fmt.Errorf("error executing task %s %v, stderr: %s", task.Command, task.Args, res.Stderr)
}
return nil
}
func DaemonReload() error {
task := execute.ExecTask{Command: "systemctl",
Args: []string{"daemon-reload"},
StreamStdio: false,
}
res, err := task.Execute()
if err != nil {
return err
}
if res.ExitCode != 0 {
return fmt.Errorf("error executing task %s %v, stderr: %s", task.Command, task.Args, res.Stderr)
}
return nil
}
func InstallUnit(name string) error {
tmpl, err := template.ParseFiles("./hack/" + name + ".service")
wd, _ := os.Getwd()
var tpl bytes.Buffer
userData := struct {
Cwd string
}{
Cwd: wd,
}
err = tmpl.Execute(&tpl, userData)
if err != nil {
return err
}
err = writeUnit(name+".service", tpl.Bytes())
if err != nil {
return err
}
return nil
}
func writeUnit(name string, data []byte) error {
f, err := os.Create(filepath.Join("/lib/systemd/system", name))
if err != nil {
return err
}
defer f.Close()
_, err = f.Write(data)
return err
}

View File

@ -1,2 +1 @@
nameserver 8.8.8.8
nameserver 8.8.8.8