mirror of
https://github.com/openfaas/faasd.git
synced 2025-06-18 20:16:36 +00:00
Compare commits
9 Commits
Author | SHA1 | Date | |
---|---|---|---|
48237e0b3c | |||
306313ed9a | |||
ff0cccf0dc | |||
52baca9d17 | |||
f76432f60a | |||
38f26b213f | |||
6c3fe813fd | |||
13d28bd2db | |||
f3f6225674 |
1
.gitignore
vendored
1
.gitignore
vendored
@ -1,3 +1,4 @@
|
||||
/faasd
|
||||
hosts
|
||||
/resolv.conf
|
||||
.idea/
|
||||
|
@ -86,17 +86,17 @@ go build
|
||||
|
||||
```sh
|
||||
# For x86_64
|
||||
sudo curl -fSLs "https://github.com/alexellis/faasd/releases/download/0.2.4/faasd" \
|
||||
sudo curl -fSLs "https://github.com/alexellis/faasd/releases/download/0.2.5/faasd" \
|
||||
-o "/usr/local/bin/faasd" \
|
||||
&& sudo chmod a+x "/usr/local/bin/faasd"
|
||||
|
||||
# armhf
|
||||
sudo curl -fSLs "https://github.com/alexellis/faasd/releases/download/0.2.4/faasd-armhf" \
|
||||
sudo curl -fSLs "https://github.com/alexellis/faasd/releases/download/0.2.5/faasd-armhf" \
|
||||
-o "/usr/local/bin/faasd" \
|
||||
&& sudo chmod a+x "/usr/local/bin/faasd"
|
||||
|
||||
# arm64
|
||||
sudo curl -fSLs "https://github.com/alexellis/faasd/releases/download/0.2.4/faasd-arm64" \
|
||||
sudo curl -fSLs "https://github.com/alexellis/faasd/releases/download/0.2.5/faasd-arm64" \
|
||||
-o "/usr/local/bin/faasd" \
|
||||
&& sudo chmod a+x "/usr/local/bin/faasd"
|
||||
```
|
||||
|
@ -65,6 +65,7 @@ func runUp(_ *cobra.Command, _ []string) error {
|
||||
log.Printf("Supervisor init done in: %s\n", time.Since(start).String())
|
||||
|
||||
shutdownTimeout := time.Second * 1
|
||||
timeout := time.Second * 60
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
@ -85,6 +86,12 @@ func runUp(_ *cobra.Command, _ []string) error {
|
||||
})
|
||||
}()
|
||||
|
||||
go func() {
|
||||
wd, _ := os.Getwd()
|
||||
proxy := pkg.NewProxy(path.Join(wd, "hosts"), timeout)
|
||||
proxy.Start()
|
||||
}()
|
||||
|
||||
wg.Wait()
|
||||
return nil
|
||||
}
|
||||
|
@ -1,11 +1,12 @@
|
||||
#!/bin/bash
|
||||
|
||||
export ARCH="armv6l"
|
||||
echo "Downloading Go"
|
||||
|
||||
curl -SLsf https://dl.google.com/go/go1.12.14.linux-armv6l.tar.gz > go.tgz
|
||||
curl -SLsf https://dl.google.com/go/go1.12.14.linux-$ARCH.tar.gz --output /tmp/go.tgz
|
||||
sudo rm -rf /usr/local/go/
|
||||
sudo mkdir -p /usr/local/go/
|
||||
sudo tar -xvf go.tgz -C /usr/local/go/ --strip-components=1
|
||||
sudo tar -xvf /tmp/go.tgz -C /usr/local/go/ --strip-components=1
|
||||
|
||||
export GOPATH=$HOME/go/
|
||||
export PATH=$PATH:/usr/local/go/bin/
|
||||
|
@ -1,11 +1,12 @@
|
||||
#!/bin/bash
|
||||
|
||||
export ARCH="amd64"
|
||||
echo "Downloading Go"
|
||||
|
||||
curl -SLsf https://dl.google.com/go/go1.12.14.linux-amd64.tar.gz > go.tgz
|
||||
curl -SLsf https://dl.google.com/go/go1.12.14.linux-$ARCH.tar.gz --output /tmp/go.tgz
|
||||
sudo rm -rf /usr/local/go/
|
||||
sudo mkdir -p /usr/local/go/
|
||||
sudo tar -xvf go.tgz -C /usr/local/go/ --strip-components=1
|
||||
sudo tar -xvf /tmp/go.tgz -C /usr/local/go/ --strip-components=1
|
||||
|
||||
export GOPATH=$HOME/go/
|
||||
export PATH=$PATH:/usr/local/go/bin/
|
||||
|
102
pkg/proxy.go
Normal file
102
pkg/proxy.go
Normal file
@ -0,0 +1,102 @@
|
||||
package pkg
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
"time"
|
||||
)
|
||||
|
||||
func NewProxy(hosts string, timeout time.Duration) *Proxy {
|
||||
|
||||
return &Proxy{
|
||||
Hosts: hosts,
|
||||
Timeout: timeout,
|
||||
}
|
||||
}
|
||||
|
||||
type Proxy struct {
|
||||
Hosts string
|
||||
Timeout time.Duration
|
||||
}
|
||||
|
||||
func (p *Proxy) Start() error {
|
||||
tcp := 8080
|
||||
|
||||
http.DefaultClient.CheckRedirect = func(req *http.Request, via []*http.Request) error {
|
||||
return http.ErrUseLastResponse
|
||||
}
|
||||
|
||||
time.Sleep(3 * time.Second)
|
||||
log.Printf("Starting faasd proxy on %d\n", tcp)
|
||||
data := struct{ host string }{
|
||||
host: "",
|
||||
}
|
||||
|
||||
fileData, fileErr := ioutil.ReadFile(p.Hosts)
|
||||
if fileErr != nil {
|
||||
return fileErr
|
||||
}
|
||||
|
||||
lines := strings.Split(string(fileData), "\n")
|
||||
for _, line := range lines {
|
||||
if strings.Index(line, "gateway") > -1 {
|
||||
data.host = line[:strings.Index(line, "\t")]
|
||||
}
|
||||
}
|
||||
fmt.Printf("Gateway: %s\n", data.host)
|
||||
|
||||
s := &http.Server{
|
||||
Addr: fmt.Sprintf(":%d", tcp),
|
||||
ReadTimeout: p.Timeout,
|
||||
WriteTimeout: p.Timeout,
|
||||
MaxHeaderBytes: 1 << 20, // Max header of 1MB
|
||||
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
query := ""
|
||||
if len(r.URL.RawQuery) > 0 {
|
||||
query = "?" + r.URL.RawQuery
|
||||
}
|
||||
|
||||
upstream := fmt.Sprintf("http://%s:8080%s%s", data.host, r.URL.Path, query)
|
||||
fmt.Printf("Forward to %s %s\n", upstream, data)
|
||||
|
||||
if r.Body != nil {
|
||||
defer r.Body.Close()
|
||||
}
|
||||
|
||||
wrapper := ioutil.NopCloser(r.Body)
|
||||
upReq, upErr := http.NewRequest(r.Method, upstream, wrapper)
|
||||
|
||||
if upErr != nil {
|
||||
log.Println(upErr)
|
||||
|
||||
http.Error(w, upErr.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
upRes, upResErr := http.DefaultClient.Do(upReq)
|
||||
|
||||
if upResErr != nil {
|
||||
log.Println(upResErr)
|
||||
|
||||
http.Error(w, upResErr.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
for k, v := range upRes.Header {
|
||||
w.Header().Set(k, v[0])
|
||||
}
|
||||
|
||||
w.WriteHeader(upRes.StatusCode)
|
||||
io.Copy(w, upRes.Body)
|
||||
|
||||
}),
|
||||
}
|
||||
|
||||
return s.ListenAndServe()
|
||||
}
|
117
pkg/service/service.go
Normal file
117
pkg/service/service.go
Normal file
@ -0,0 +1,117 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/containerd/containerd"
|
||||
"github.com/containerd/containerd/errdefs"
|
||||
"golang.org/x/sys/unix"
|
||||
)
|
||||
|
||||
// Remove removes a container
|
||||
func Remove(ctx context.Context, client *containerd.Client, name string) error {
|
||||
|
||||
container, containerErr := client.LoadContainer(ctx, name)
|
||||
|
||||
if containerErr == nil {
|
||||
found := true
|
||||
t, err := container.Task(ctx, nil)
|
||||
if err != nil {
|
||||
if errdefs.IsNotFound(err) {
|
||||
found = false
|
||||
} else {
|
||||
return fmt.Errorf("unable to get task %s: ", err)
|
||||
}
|
||||
}
|
||||
|
||||
if found {
|
||||
status, _ := t.Status(ctx)
|
||||
fmt.Printf("Status of %s is: %s\n", name, status.Status)
|
||||
|
||||
log.Printf("Need to kill %s\n", name)
|
||||
err := killTask(ctx, t)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error killing task %s, %s, %s", container.ID(), name, err)
|
||||
}
|
||||
}
|
||||
|
||||
err = container.Delete(ctx, containerd.WithSnapshotCleanup)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error deleting container %s, %s, %s", container.ID(), name, err)
|
||||
}
|
||||
} else {
|
||||
service := client.SnapshotService("")
|
||||
key := name + "snapshot"
|
||||
if _, err := client.SnapshotService("").Stat(ctx, key); err == nil {
|
||||
service.Remove(ctx, key)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// From Stellar
|
||||
func killTask(ctx context.Context, task containerd.Task) error {
|
||||
wg := &sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
var err error
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
if task != nil {
|
||||
wait, err := task.Wait(ctx)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("error waiting on task: %s", err)
|
||||
return
|
||||
}
|
||||
if err := task.Kill(ctx, unix.SIGTERM, containerd.WithKillAll); err != nil {
|
||||
log.Printf("error killing container task: %s", err)
|
||||
}
|
||||
select {
|
||||
case <-wait:
|
||||
task.Delete(ctx)
|
||||
return
|
||||
case <-time.After(5 * time.Second):
|
||||
if err := task.Kill(ctx, unix.SIGKILL, containerd.WithKillAll); err != nil {
|
||||
log.Printf("error force killing container task: %s", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
wg.Wait()
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func PrepareImage(ctx context.Context, client *containerd.Client, imageName, snapshotter string) (containerd.Image, error) {
|
||||
|
||||
var empty containerd.Image
|
||||
image, err := client.GetImage(ctx, imageName)
|
||||
if err != nil {
|
||||
if !errdefs.IsNotFound(err) {
|
||||
return empty, err
|
||||
}
|
||||
|
||||
img, err := client.Pull(ctx, imageName, containerd.WithPullUnpack)
|
||||
if err != nil {
|
||||
return empty, fmt.Errorf("cannot pull: %s", err)
|
||||
}
|
||||
image = img
|
||||
}
|
||||
|
||||
unpacked, err := image.IsUnpacked(ctx, snapshotter)
|
||||
if err != nil {
|
||||
return empty, fmt.Errorf("cannot check if unpacked: %s", err)
|
||||
}
|
||||
|
||||
if !unpacked {
|
||||
if err := image.Unpack(ctx, snapshotter); err != nil {
|
||||
return empty, fmt.Errorf("cannot unpack: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
return image, nil
|
||||
}
|
@ -8,15 +8,12 @@ import (
|
||||
"os"
|
||||
"os/exec"
|
||||
"path"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/alexellis/faasd/pkg/service"
|
||||
"github.com/alexellis/faasd/pkg/weave"
|
||||
"github.com/containerd/containerd"
|
||||
"github.com/containerd/containerd/cio"
|
||||
"github.com/containerd/containerd/containers"
|
||||
"github.com/containerd/containerd/errdefs"
|
||||
"golang.org/x/sys/unix"
|
||||
|
||||
"github.com/containerd/containerd/namespaces"
|
||||
"github.com/containerd/containerd/oci"
|
||||
@ -48,7 +45,7 @@ func (s *Supervisor) Remove(svcs []Service) error {
|
||||
ctx := namespaces.WithNamespace(context.Background(), "default")
|
||||
|
||||
for _, svc := range svcs {
|
||||
err := removeContainer(ctx, s.client, svc.Name)
|
||||
err := service.Remove(ctx, s.client, svc.Name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -75,7 +72,7 @@ func (s *Supervisor) Start(svcs []Service) error {
|
||||
for _, svc := range svcs {
|
||||
fmt.Printf("Preparing: %s with image: %s\n", svc.Name, svc.Image)
|
||||
|
||||
img, err := prepareImage(ctx, s.client, svc.Image)
|
||||
img, err := service.PrepareImage(ctx, s.client, svc.Image, defaultSnapshotter)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -87,13 +84,13 @@ func (s *Supervisor) Start(svcs []Service) error {
|
||||
for _, svc := range svcs {
|
||||
fmt.Printf("Reconciling: %s\n", svc.Name)
|
||||
|
||||
image := images[svc.Name]
|
||||
|
||||
containerErr := removeContainer(ctx, s.client, svc.Name)
|
||||
containerErr := service.Remove(ctx, s.client, svc.Name)
|
||||
if containerErr != nil {
|
||||
return containerErr
|
||||
}
|
||||
|
||||
image := images[svc.Name]
|
||||
|
||||
mounts := []specs.Mount{}
|
||||
if len(svc.Mounts) > 0 {
|
||||
for _, mnt := range svc.Mounts {
|
||||
@ -189,7 +186,6 @@ func (s *Supervisor) Start(svcs []Service) error {
|
||||
}
|
||||
log.Println("Exited: ", exitStatusC)
|
||||
|
||||
// call start on the task to execute the redis server
|
||||
if err := task.Start(ctx); err != nil {
|
||||
log.Println("Task err: ", err)
|
||||
return err
|
||||
@ -199,37 +195,6 @@ func (s *Supervisor) Start(svcs []Service) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func prepareImage(ctx context.Context, client *containerd.Client, imageName string) (containerd.Image, error) {
|
||||
snapshotter := defaultSnapshotter
|
||||
|
||||
var empty containerd.Image
|
||||
image, err := client.GetImage(ctx, imageName)
|
||||
if err != nil {
|
||||
if !errdefs.IsNotFound(err) {
|
||||
return empty, err
|
||||
}
|
||||
|
||||
img, err := client.Pull(ctx, imageName, containerd.WithPullUnpack)
|
||||
if err != nil {
|
||||
return empty, fmt.Errorf("cannot pull: %s", err)
|
||||
}
|
||||
image = img
|
||||
}
|
||||
|
||||
unpacked, err := image.IsUnpacked(ctx, snapshotter)
|
||||
if err != nil {
|
||||
return empty, fmt.Errorf("cannot check if unpacked: %s", err)
|
||||
}
|
||||
|
||||
if !unpacked {
|
||||
if err := image.Unpack(ctx, snapshotter); err != nil {
|
||||
return empty, fmt.Errorf("cannot unpack: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
return image, nil
|
||||
}
|
||||
|
||||
func getIP(containerID string, taskPID uint32) string {
|
||||
// https://github.com/weaveworks/weave/blob/master/net/netdev.go
|
||||
|
||||
@ -274,70 +239,3 @@ func withOCIArgs(args []string) oci.SpecOpts {
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// From Stellar
|
||||
func killTask(ctx context.Context, task containerd.Task) error {
|
||||
wg := &sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
var err error
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
if task != nil {
|
||||
wait, err := task.Wait(ctx)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("error waiting on task: %s", err)
|
||||
return
|
||||
}
|
||||
if err := task.Kill(ctx, unix.SIGTERM, containerd.WithKillAll); err != nil {
|
||||
log.Printf("error killing container task: %s", err)
|
||||
}
|
||||
select {
|
||||
case <-wait:
|
||||
task.Delete(ctx)
|
||||
return
|
||||
case <-time.After(5 * time.Second):
|
||||
if err := task.Kill(ctx, unix.SIGKILL, containerd.WithKillAll); err != nil {
|
||||
log.Printf("error force killing container task: %s", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
wg.Wait()
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func removeContainer(ctx context.Context, client *containerd.Client, name string) error {
|
||||
|
||||
container, containerErr := client.LoadContainer(ctx, name)
|
||||
|
||||
if containerErr == nil {
|
||||
found := true
|
||||
t, err := container.Task(ctx, nil)
|
||||
if err != nil {
|
||||
if errdefs.IsNotFound(err) {
|
||||
found = false
|
||||
} else {
|
||||
return fmt.Errorf("unable to get task %s: ", err)
|
||||
}
|
||||
}
|
||||
|
||||
if found {
|
||||
status, _ := t.Status(ctx)
|
||||
fmt.Printf("Status of %s is: %s\n", name, status.Status)
|
||||
|
||||
log.Printf("Need to kill %s\n", name)
|
||||
err := killTask(ctx, t)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error killing task %s, %s, %s", container.ID(), name, err)
|
||||
}
|
||||
}
|
||||
|
||||
err = container.Delete(ctx, containerd.WithSnapshotCleanup)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error deleting container %s, %s, %s", container.ID(), name, err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -66,7 +66,7 @@ func DaemonReload() error {
|
||||
|
||||
func InstallUnit(name string) error {
|
||||
tmplName := "./hack/" + name + ".service"
|
||||
tmpl, err := template.ParseFiles()
|
||||
tmpl, err := template.ParseFiles(tmplName)
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("error loading template %s, error %s", tmplName, err)
|
||||
|
Reference in New Issue
Block a user