mirror of
https://github.com/openfaas/faasd.git
synced 2025-06-20 13:06:38 +00:00
Compare commits
19 Commits
Author | SHA1 | Date | |
---|---|---|---|
fcfd0d6fc9 | |||
beb84a3775 | |||
52baca9d17 | |||
f76432f60a | |||
38f26b213f | |||
6c3fe813fd | |||
13d28bd2db | |||
f3f6225674 | |||
e4ed9e5b91 | |||
5a28f3e231 | |||
a042be5477 | |||
6230d3504e | |||
4ba3ec3b64 | |||
ea9386d285 | |||
03500c5649 | |||
867f8459b0 | |||
6737712b28 | |||
832893998d | |||
1732566748 |
1
.gitignore
vendored
1
.gitignore
vendored
@ -1,3 +1,4 @@
|
|||||||
/faasd
|
/faasd
|
||||||
hosts
|
hosts
|
||||||
/resolv.conf
|
/resolv.conf
|
||||||
|
.idea/
|
||||||
|
73
README.md
73
README.md
@ -24,64 +24,84 @@ You can use the standard [faas-cli](https://github.com/openfaas/faas-cli) with f
|
|||||||
* `faas describe`
|
* `faas describe`
|
||||||
* `faas deploy --update=true --replace=false`
|
* `faas deploy --update=true --replace=false`
|
||||||
* `faas invoke`
|
* `faas invoke`
|
||||||
|
* `faas invoke --async`
|
||||||
|
|
||||||
Other operations are pending development in the provider.
|
Other operations are pending development in the provider.
|
||||||
|
|
||||||
### Pre-reqs
|
### Pre-reqs
|
||||||
|
|
||||||
* Linux - ideally Ubuntu, which is used for testing.
|
* Linux - ideally Ubuntu, which is used for testing
|
||||||
* Installation steps as per [faas-containerd](https://github.com/alexellis/faas-containerd) for building and for development
|
* Installation steps as per [faas-containerd](https://github.com/alexellis/faas-containerd) for building and for development
|
||||||
|
* [netns](https://github.com/genuinetools/netns/releases) binary in `$PATH`
|
||||||
|
* [containerd v1.3.2](https://github.com/containerd/containerd)
|
||||||
* [faas-cli](https://github.com/openfaas/faas-cli) (optional)
|
* [faas-cli](https://github.com/openfaas/faas-cli) (optional)
|
||||||
|
|
||||||
## Backlog
|
## Backlog
|
||||||
|
|
||||||
* Use CNI to create network namespaces and adapters
|
Pending:
|
||||||
* Inject / manage IPs between core components for service to service communication - i.e. so Prometheus can scrape the OpenFaaS gateway
|
|
||||||
* Monitor and restart any of the core components, if they crash
|
|
||||||
* Configure `basic_auth` to protect the OpenFaaS gateway and faas-containerd HTTP API
|
|
||||||
* Self-install / create systemd service on start-up using [go-systemd](https://github.com/coreos/go-systemd)
|
|
||||||
* Bundle/package/automate installation of containerd - [see bootstrap from k3s](https://github.com/rancher/k3s)
|
|
||||||
* Create [faasd.service](https://github.com/rancher/k3s/blob/master/k3s.service)
|
|
||||||
|
|
||||||
|
* [ ] Configure `basic_auth` to protect the OpenFaaS gateway and faas-containerd HTTP API
|
||||||
|
* [ ] Use CNI to create network namespaces and adapters
|
||||||
|
* [ ] Monitor and restart any of the core components at runtime if the container stops
|
||||||
|
* [ ] Bundle/package/automate installation of containerd - [see bootstrap from k3s](https://github.com/rancher/k3s)
|
||||||
|
* [ ] Provide ufw rules / example for blocking access to everything but a reverse proxy to the gateway container
|
||||||
|
|
||||||
## Hacking
|
Done:
|
||||||
|
|
||||||
|
* [x] Inject / manage IPs between core components for service to service communication - i.e. so Prometheus can scrape the OpenFaaS gateway - done via `/etc/hosts` mount
|
||||||
|
* [x] Add queue-worker and NATS
|
||||||
|
* [x] Create faasd.service and faas-containerd.service
|
||||||
|
* [x] Self-install / create systemd service via `faasd install`
|
||||||
|
* [x] Restart containers upon restart of faasd
|
||||||
|
* [x] Clear / remove containers and tasks with SIGTERM / SIGINT
|
||||||
|
* [x] Determine armhf/arm64 containers to run for gateway
|
||||||
|
|
||||||
|
## Hacking (build from source)
|
||||||
|
|
||||||
First run faas-containerd
|
First run faas-containerd
|
||||||
|
|
||||||
```sh
|
```sh
|
||||||
cd $GOPATH/src/github.com/alexellis/faas-containerd
|
cd $GOPATH/src/github.com/alexellis/faas-containerd
|
||||||
go build && sudo ./faas-containerd
|
|
||||||
|
# You'll need to install containerd and its pre-reqs first
|
||||||
|
# https://github.com/alexellis/faas-containerd/
|
||||||
|
|
||||||
|
sudo ./faas-containerd
|
||||||
```
|
```
|
||||||
|
|
||||||
Then run faasd, which brings up the gateway and Prometheus as containers
|
Then run faasd, which brings up the gateway and Prometheus as containers
|
||||||
|
|
||||||
```sh
|
```sh
|
||||||
cd $GOPATH/src/github.com/alexellis/faasd
|
cd $GOPATH/src/github.com/alexellis/faasd
|
||||||
go build && sudo ./faasd
|
go build
|
||||||
|
|
||||||
|
# Install with systemd
|
||||||
|
# sudo ./faasd install
|
||||||
|
|
||||||
|
# Or run interactively
|
||||||
|
# sudo ./faasd up
|
||||||
```
|
```
|
||||||
|
|
||||||
Or get from binaries:
|
### Build and run (binaries)
|
||||||
|
|
||||||
|
|
||||||
### Build and run
|
|
||||||
|
|
||||||
```sh
|
```sh
|
||||||
# For x86_64
|
# For x86_64
|
||||||
sudo curl -fSLs "https://github.com/alexellis/faasd/releases/download/0.2.1/faasd" \
|
sudo curl -fSLs "https://github.com/alexellis/faasd/releases/download/0.2.5/faasd" \
|
||||||
-o "/usr/local/bin/faasd" \
|
-o "/usr/local/bin/faasd" \
|
||||||
&& sudo chmod a+x "/usr/local/bin/faasd"
|
&& sudo chmod a+x "/usr/local/bin/faasd"
|
||||||
|
|
||||||
# armhf
|
# armhf
|
||||||
sudo curl -fSLs "https://github.com/alexellis/faasd/releases/download/0.2.1/faasd-armhf" \
|
sudo curl -fSLs "https://github.com/alexellis/faasd/releases/download/0.2.5/faasd-armhf" \
|
||||||
-o "/usr/local/bin/faasd" \
|
-o "/usr/local/bin/faasd" \
|
||||||
&& sudo chmod a+x "/usr/local/bin/faasd"
|
&& sudo chmod a+x "/usr/local/bin/faasd"
|
||||||
|
|
||||||
# arm64
|
# arm64
|
||||||
sudo curl -fSLs "https://github.com/alexellis/faasd/releases/download/0.2.1/faasd-arm64" \
|
sudo curl -fSLs "https://github.com/alexellis/faasd/releases/download/0.2.5/faasd-arm64" \
|
||||||
-o "/usr/local/bin/faasd" \
|
-o "/usr/local/bin/faasd" \
|
||||||
&& sudo chmod a+x "/usr/local/bin/faasd"
|
&& sudo chmod a+x "/usr/local/bin/faasd"
|
||||||
```
|
```
|
||||||
|
|
||||||
|
### At run-time
|
||||||
|
|
||||||
Look in `hosts` in the current working folder to get the IP for the gateway or Prometheus
|
Look in `hosts` in the current working folder to get the IP for the gateway or Prometheus
|
||||||
|
|
||||||
@ -91,11 +111,25 @@ Look in `hosts` in the current working folder to get the IP for the gateway or P
|
|||||||
172.19.0.2 prometheus
|
172.19.0.2 prometheus
|
||||||
|
|
||||||
172.19.0.3 gateway
|
172.19.0.3 gateway
|
||||||
|
172.19.0.4 nats
|
||||||
|
172.19.0.5 queue-worker
|
||||||
```
|
```
|
||||||
|
|
||||||
Since faas-containerd uses containerd heavily it is not running as a container, but as a stand-alone process. Its port is available via the bridge interface, i.e. netns0.
|
Since faas-containerd uses containerd heavily it is not running as a container, but as a stand-alone process. Its port is available via the bridge interface, i.e. netns0.
|
||||||
|
|
||||||
Now go to the gateway's IP address as shown above on port 8080, i.e. http://172.19.0.3:8080 - you can also use this address to deploy OpenFaaS Functions via the `faas-cli`.
|
* Prometheus will run on the Prometheus IP plus port 8080 i.e. http://172.19.0.2:9090/targets
|
||||||
|
|
||||||
|
* faas-containerd runs on 172.19.0.1:8081
|
||||||
|
|
||||||
|
* Now go to the gateway's IP address as shown above on port 8080, i.e. http://172.19.0.3:8080 - you can also use this address to deploy OpenFaaS Functions via the `faas-cli`.
|
||||||
|
|
||||||
|
#### Installation with systemd
|
||||||
|
|
||||||
|
* `faasd install` - install faasd and containerd with systemd, run in `$GOPATH/src/github.com/alexellis/faasd`
|
||||||
|
* `journalctl -u faasd` - faasd systemd logs
|
||||||
|
* `journalctl -u faas-containerd` - faas-containerd systemd logs
|
||||||
|
|
||||||
|
### Appendix
|
||||||
|
|
||||||
Removing containers:
|
Removing containers:
|
||||||
|
|
||||||
@ -117,4 +151,3 @@ https://github.com/containernetworking/plugins
|
|||||||
|
|
||||||
https://github.com/containerd/go-cni
|
https://github.com/containerd/go-cni
|
||||||
|
|
||||||
|
|
||||||
|
@ -23,6 +23,11 @@ func runInstall(_ *cobra.Command, _ []string) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
err = binExists("/usr/local/bin/", "faasd")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
err = binExists("/usr/local/bin/", "netns")
|
err = binExists("/usr/local/bin/", "netns")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
11
cmd/up.go
11
cmd/up.go
@ -65,6 +65,7 @@ func runUp(_ *cobra.Command, _ []string) error {
|
|||||||
log.Printf("Supervisor init done in: %s\n", time.Since(start).String())
|
log.Printf("Supervisor init done in: %s\n", time.Since(start).String())
|
||||||
|
|
||||||
shutdownTimeout := time.Second * 1
|
shutdownTimeout := time.Second * 1
|
||||||
|
timeout := time.Second * 60
|
||||||
|
|
||||||
wg := sync.WaitGroup{}
|
wg := sync.WaitGroup{}
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
@ -85,6 +86,12 @@ func runUp(_ *cobra.Command, _ []string) error {
|
|||||||
})
|
})
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
wd, _ := os.Getwd()
|
||||||
|
proxy := pkg.NewProxy(path.Join(wd, "hosts"), timeout)
|
||||||
|
proxy.Start()
|
||||||
|
}()
|
||||||
|
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -124,7 +131,7 @@ func makeServiceDefinitions(archSuffix string) []pkg.Service {
|
|||||||
"faas_nats_address=nats",
|
"faas_nats_address=nats",
|
||||||
"faas_nats_port=4222",
|
"faas_nats_port=4222",
|
||||||
},
|
},
|
||||||
Image: "docker.io/openfaas/gateway:0.18.7" + archSuffix,
|
Image: "docker.io/openfaas/gateway:0.18.8" + archSuffix,
|
||||||
Mounts: []pkg.Mount{},
|
Mounts: []pkg.Mount{},
|
||||||
Caps: []string{"CAP_NET_RAW"},
|
Caps: []string{"CAP_NET_RAW"},
|
||||||
},
|
},
|
||||||
@ -137,7 +144,7 @@ func makeServiceDefinitions(archSuffix string) []pkg.Service {
|
|||||||
"faas_gateway_address=gateway",
|
"faas_gateway_address=gateway",
|
||||||
"ack_wait=5m5s",
|
"ack_wait=5m5s",
|
||||||
"max_inflight=1",
|
"max_inflight=1",
|
||||||
"faas_print_body=true",
|
"write_debug=false",
|
||||||
},
|
},
|
||||||
Image: "docker.io/openfaas/queue-worker:0.9.0",
|
Image: "docker.io/openfaas/queue-worker:0.9.0",
|
||||||
Mounts: []pkg.Mount{},
|
Mounts: []pkg.Mount{},
|
||||||
|
29
hack/build-containerd-armhf.sh
Executable file
29
hack/build-containerd-armhf.sh
Executable file
@ -0,0 +1,29 @@
|
|||||||
|
#!/bin/bash
|
||||||
|
|
||||||
|
export ARCH="armv6l"
|
||||||
|
echo "Downloading Go"
|
||||||
|
|
||||||
|
curl -SLsf https://dl.google.com/go/go1.12.14.linux-$ARCH.tar.gz --output /tmp/go.tgz
|
||||||
|
sudo rm -rf /usr/local/go/
|
||||||
|
sudo mkdir -p /usr/local/go/
|
||||||
|
sudo tar -xvf /tmp/go.tgz -C /usr/local/go/ --strip-components=1
|
||||||
|
|
||||||
|
export GOPATH=$HOME/go/
|
||||||
|
export PATH=$PATH:/usr/local/go/bin/
|
||||||
|
|
||||||
|
go version
|
||||||
|
|
||||||
|
echo "Building containerd"
|
||||||
|
|
||||||
|
mkdir -p $GOPATH/src/github.com/containerd
|
||||||
|
cd $GOPATH/src/github.com/containerd
|
||||||
|
git clone https://github.com/containerd/containerd
|
||||||
|
|
||||||
|
cd containerd
|
||||||
|
git fetch origin --tags
|
||||||
|
git checkout v1.3.2
|
||||||
|
|
||||||
|
make
|
||||||
|
sudo make install
|
||||||
|
|
||||||
|
sudo containerd --version
|
29
hack/build-containerd.sh
Normal file
29
hack/build-containerd.sh
Normal file
@ -0,0 +1,29 @@
|
|||||||
|
#!/bin/bash
|
||||||
|
|
||||||
|
export ARCH="amd64"
|
||||||
|
echo "Downloading Go"
|
||||||
|
|
||||||
|
curl -SLsf https://dl.google.com/go/go1.12.14.linux-$ARCH.tar.gz --output /tmp/go.tgz
|
||||||
|
sudo rm -rf /usr/local/go/
|
||||||
|
sudo mkdir -p /usr/local/go/
|
||||||
|
sudo tar -xvf /tmp/go.tgz -C /usr/local/go/ --strip-components=1
|
||||||
|
|
||||||
|
export GOPATH=$HOME/go/
|
||||||
|
export PATH=$PATH:/usr/local/go/bin/
|
||||||
|
|
||||||
|
go version
|
||||||
|
|
||||||
|
echo "Building containerd"
|
||||||
|
|
||||||
|
mkdir -p $GOPATH/src/github.com/containerd
|
||||||
|
cd $GOPATH/src/github.com/containerd
|
||||||
|
git clone https://github.com/containerd/containerd
|
||||||
|
|
||||||
|
cd containerd
|
||||||
|
git fetch origin --tags
|
||||||
|
git checkout v1.3.2
|
||||||
|
|
||||||
|
make
|
||||||
|
sudo make install
|
||||||
|
|
||||||
|
sudo containerd --version
|
@ -4,7 +4,7 @@ After=faas-containerd.service
|
|||||||
|
|
||||||
[Service]
|
[Service]
|
||||||
MemoryLimit=500M
|
MemoryLimit=500M
|
||||||
ExecStart={{.Cwd}}/faasd up
|
ExecStart=/usr/local/bin/faasd up
|
||||||
Restart=on-failure
|
Restart=on-failure
|
||||||
RestartSec=10s
|
RestartSec=10s
|
||||||
WorkingDirectory={{.Cwd}}
|
WorkingDirectory={{.Cwd}}
|
||||||
|
98
pkg/proxy.go
Normal file
98
pkg/proxy.go
Normal file
@ -0,0 +1,98 @@
|
|||||||
|
package pkg
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
|
"log"
|
||||||
|
"net/http"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func NewProxy(hosts string, timeout time.Duration) *Proxy {
|
||||||
|
|
||||||
|
return &Proxy{
|
||||||
|
Hosts: hosts,
|
||||||
|
Timeout: timeout,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type Proxy struct {
|
||||||
|
Hosts string
|
||||||
|
Timeout time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Proxy) Start() error {
|
||||||
|
tcp := 8080
|
||||||
|
|
||||||
|
time.Sleep(3 * time.Second)
|
||||||
|
log.Printf("Starting faasd proxy on %d\n", tcp)
|
||||||
|
data := struct{ host string }{
|
||||||
|
host: "",
|
||||||
|
}
|
||||||
|
|
||||||
|
fileData, fileErr := ioutil.ReadFile(p.Hosts)
|
||||||
|
if fileErr != nil {
|
||||||
|
return fileErr
|
||||||
|
}
|
||||||
|
|
||||||
|
lines := strings.Split(string(fileData), "\n")
|
||||||
|
for _, line := range lines {
|
||||||
|
if strings.Index(line, "gateway") > -1 {
|
||||||
|
data.host = line[:strings.Index(line, "\t")]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
fmt.Printf("Gateway: %s\n", data.host)
|
||||||
|
|
||||||
|
s := &http.Server{
|
||||||
|
Addr: fmt.Sprintf(":%d", tcp),
|
||||||
|
ReadTimeout: p.Timeout,
|
||||||
|
WriteTimeout: p.Timeout,
|
||||||
|
MaxHeaderBytes: 1 << 20, // Max header of 1MB
|
||||||
|
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
|
||||||
|
query := ""
|
||||||
|
if len(r.URL.RawQuery) > 0 {
|
||||||
|
query = "?" + r.URL.RawQuery
|
||||||
|
}
|
||||||
|
|
||||||
|
upstream := fmt.Sprintf("http://%s:8080%s%s", data.host, r.URL.Path, query)
|
||||||
|
fmt.Printf("Forward to %s %s\n", upstream, data)
|
||||||
|
|
||||||
|
if r.Body != nil {
|
||||||
|
defer r.Body.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
wrapper := ioutil.NopCloser(r.Body)
|
||||||
|
upReq, upErr := http.NewRequest(r.Method, upstream, wrapper)
|
||||||
|
|
||||||
|
if upErr != nil {
|
||||||
|
log.Println(upErr)
|
||||||
|
|
||||||
|
http.Error(w, upErr.Error(), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
upRes, upResErr := http.DefaultClient.Do(upReq)
|
||||||
|
|
||||||
|
if upResErr != nil {
|
||||||
|
log.Println(upResErr)
|
||||||
|
|
||||||
|
http.Error(w, upResErr.Error(), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for k, v := range upRes.Header {
|
||||||
|
w.Header().Set(k, v[0])
|
||||||
|
}
|
||||||
|
|
||||||
|
w.WriteHeader(upRes.StatusCode)
|
||||||
|
io.Copy(w, upRes.Body)
|
||||||
|
|
||||||
|
}),
|
||||||
|
}
|
||||||
|
|
||||||
|
return s.ListenAndServe()
|
||||||
|
}
|
117
pkg/service/service.go
Normal file
117
pkg/service/service.go
Normal file
@ -0,0 +1,117 @@
|
|||||||
|
package service
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/containerd/containerd"
|
||||||
|
"github.com/containerd/containerd/errdefs"
|
||||||
|
"golang.org/x/sys/unix"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Remove removes a container
|
||||||
|
func Remove(ctx context.Context, client *containerd.Client, name string) error {
|
||||||
|
|
||||||
|
container, containerErr := client.LoadContainer(ctx, name)
|
||||||
|
|
||||||
|
if containerErr == nil {
|
||||||
|
found := true
|
||||||
|
t, err := container.Task(ctx, nil)
|
||||||
|
if err != nil {
|
||||||
|
if errdefs.IsNotFound(err) {
|
||||||
|
found = false
|
||||||
|
} else {
|
||||||
|
return fmt.Errorf("unable to get task %s: ", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if found {
|
||||||
|
status, _ := t.Status(ctx)
|
||||||
|
fmt.Printf("Status of %s is: %s\n", name, status.Status)
|
||||||
|
|
||||||
|
log.Printf("Need to kill %s\n", name)
|
||||||
|
err := killTask(ctx, t)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error killing task %s, %s, %s", container.ID(), name, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
err = container.Delete(ctx, containerd.WithSnapshotCleanup)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error deleting container %s, %s, %s", container.ID(), name, err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
service := client.SnapshotService("")
|
||||||
|
key := name + "snapshot"
|
||||||
|
if _, err := client.SnapshotService("").Stat(ctx, key); err == nil {
|
||||||
|
service.Remove(ctx, key)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// From Stellar
|
||||||
|
func killTask(ctx context.Context, task containerd.Task) error {
|
||||||
|
wg := &sync.WaitGroup{}
|
||||||
|
wg.Add(1)
|
||||||
|
var err error
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
if task != nil {
|
||||||
|
wait, err := task.Wait(ctx)
|
||||||
|
if err != nil {
|
||||||
|
err = fmt.Errorf("error waiting on task: %s", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err := task.Kill(ctx, unix.SIGTERM, containerd.WithKillAll); err != nil {
|
||||||
|
log.Printf("error killing container task: %s", err)
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case <-wait:
|
||||||
|
task.Delete(ctx)
|
||||||
|
return
|
||||||
|
case <-time.After(5 * time.Second):
|
||||||
|
if err := task.Kill(ctx, unix.SIGKILL, containerd.WithKillAll); err != nil {
|
||||||
|
log.Printf("error force killing container task: %s", err)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func PrepareImage(ctx context.Context, client *containerd.Client, imageName, snapshotter string) (containerd.Image, error) {
|
||||||
|
|
||||||
|
var empty containerd.Image
|
||||||
|
image, err := client.GetImage(ctx, imageName)
|
||||||
|
if err != nil {
|
||||||
|
if !errdefs.IsNotFound(err) {
|
||||||
|
return empty, err
|
||||||
|
}
|
||||||
|
|
||||||
|
img, err := client.Pull(ctx, imageName, containerd.WithPullUnpack)
|
||||||
|
if err != nil {
|
||||||
|
return empty, fmt.Errorf("cannot pull: %s", err)
|
||||||
|
}
|
||||||
|
image = img
|
||||||
|
}
|
||||||
|
|
||||||
|
unpacked, err := image.IsUnpacked(ctx, snapshotter)
|
||||||
|
if err != nil {
|
||||||
|
return empty, fmt.Errorf("cannot check if unpacked: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !unpacked {
|
||||||
|
if err := image.Unpack(ctx, snapshotter); err != nil {
|
||||||
|
return empty, fmt.Errorf("cannot unpack: %s", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return image, nil
|
||||||
|
}
|
@ -8,15 +8,12 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"os/exec"
|
"os/exec"
|
||||||
"path"
|
"path"
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
|
"github.com/alexellis/faasd/pkg/service"
|
||||||
"github.com/alexellis/faasd/pkg/weave"
|
"github.com/alexellis/faasd/pkg/weave"
|
||||||
"github.com/containerd/containerd"
|
"github.com/containerd/containerd"
|
||||||
"github.com/containerd/containerd/cio"
|
"github.com/containerd/containerd/cio"
|
||||||
"github.com/containerd/containerd/containers"
|
"github.com/containerd/containerd/containers"
|
||||||
"github.com/containerd/containerd/errdefs"
|
|
||||||
"golang.org/x/sys/unix"
|
|
||||||
|
|
||||||
"github.com/containerd/containerd/namespaces"
|
"github.com/containerd/containerd/namespaces"
|
||||||
"github.com/containerd/containerd/oci"
|
"github.com/containerd/containerd/oci"
|
||||||
@ -48,7 +45,7 @@ func (s *Supervisor) Remove(svcs []Service) error {
|
|||||||
ctx := namespaces.WithNamespace(context.Background(), "default")
|
ctx := namespaces.WithNamespace(context.Background(), "default")
|
||||||
|
|
||||||
for _, svc := range svcs {
|
for _, svc := range svcs {
|
||||||
err := removeContainer(ctx, s.client, svc.Name)
|
err := service.Remove(ctx, s.client, svc.Name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -75,7 +72,7 @@ func (s *Supervisor) Start(svcs []Service) error {
|
|||||||
for _, svc := range svcs {
|
for _, svc := range svcs {
|
||||||
fmt.Printf("Preparing: %s with image: %s\n", svc.Name, svc.Image)
|
fmt.Printf("Preparing: %s with image: %s\n", svc.Name, svc.Image)
|
||||||
|
|
||||||
img, err := prepareImage(ctx, s.client, svc.Image)
|
img, err := service.PrepareImage(ctx, s.client, svc.Image, defaultSnapshotter)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -87,13 +84,13 @@ func (s *Supervisor) Start(svcs []Service) error {
|
|||||||
for _, svc := range svcs {
|
for _, svc := range svcs {
|
||||||
fmt.Printf("Reconciling: %s\n", svc.Name)
|
fmt.Printf("Reconciling: %s\n", svc.Name)
|
||||||
|
|
||||||
image := images[svc.Name]
|
containerErr := service.Remove(ctx, s.client, svc.Name)
|
||||||
|
|
||||||
containerErr := removeContainer(ctx, s.client, svc.Name)
|
|
||||||
if containerErr != nil {
|
if containerErr != nil {
|
||||||
return containerErr
|
return containerErr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
image := images[svc.Name]
|
||||||
|
|
||||||
mounts := []specs.Mount{}
|
mounts := []specs.Mount{}
|
||||||
if len(svc.Mounts) > 0 {
|
if len(svc.Mounts) > 0 {
|
||||||
for _, mnt := range svc.Mounts {
|
for _, mnt := range svc.Mounts {
|
||||||
@ -189,7 +186,6 @@ func (s *Supervisor) Start(svcs []Service) error {
|
|||||||
}
|
}
|
||||||
log.Println("Exited: ", exitStatusC)
|
log.Println("Exited: ", exitStatusC)
|
||||||
|
|
||||||
// call start on the task to execute the redis server
|
|
||||||
if err := task.Start(ctx); err != nil {
|
if err := task.Start(ctx); err != nil {
|
||||||
log.Println("Task err: ", err)
|
log.Println("Task err: ", err)
|
||||||
return err
|
return err
|
||||||
@ -199,37 +195,6 @@ func (s *Supervisor) Start(svcs []Service) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func prepareImage(ctx context.Context, client *containerd.Client, imageName string) (containerd.Image, error) {
|
|
||||||
snapshotter := defaultSnapshotter
|
|
||||||
|
|
||||||
var empty containerd.Image
|
|
||||||
image, err := client.GetImage(ctx, imageName)
|
|
||||||
if err != nil {
|
|
||||||
if !errdefs.IsNotFound(err) {
|
|
||||||
return empty, err
|
|
||||||
}
|
|
||||||
|
|
||||||
img, err := client.Pull(ctx, imageName, containerd.WithPullUnpack)
|
|
||||||
if err != nil {
|
|
||||||
return empty, fmt.Errorf("cannot pull: %s", err)
|
|
||||||
}
|
|
||||||
image = img
|
|
||||||
}
|
|
||||||
|
|
||||||
unpacked, err := image.IsUnpacked(ctx, snapshotter)
|
|
||||||
if err != nil {
|
|
||||||
return empty, fmt.Errorf("cannot check if unpacked: %s", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if !unpacked {
|
|
||||||
if err := image.Unpack(ctx, snapshotter); err != nil {
|
|
||||||
return empty, fmt.Errorf("cannot unpack: %s", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return image, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func getIP(containerID string, taskPID uint32) string {
|
func getIP(containerID string, taskPID uint32) string {
|
||||||
// https://github.com/weaveworks/weave/blob/master/net/netdev.go
|
// https://github.com/weaveworks/weave/blob/master/net/netdev.go
|
||||||
|
|
||||||
@ -274,70 +239,3 @@ func withOCIArgs(args []string) oci.SpecOpts {
|
|||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// From Stellar
|
|
||||||
func killTask(ctx context.Context, task containerd.Task) error {
|
|
||||||
wg := &sync.WaitGroup{}
|
|
||||||
wg.Add(1)
|
|
||||||
var err error
|
|
||||||
go func() {
|
|
||||||
defer wg.Done()
|
|
||||||
if task != nil {
|
|
||||||
wait, err := task.Wait(ctx)
|
|
||||||
if err != nil {
|
|
||||||
err = fmt.Errorf("error waiting on task: %s", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if err := task.Kill(ctx, unix.SIGTERM, containerd.WithKillAll); err != nil {
|
|
||||||
log.Printf("error killing container task: %s", err)
|
|
||||||
}
|
|
||||||
select {
|
|
||||||
case <-wait:
|
|
||||||
task.Delete(ctx)
|
|
||||||
return
|
|
||||||
case <-time.After(5 * time.Second):
|
|
||||||
if err := task.Kill(ctx, unix.SIGKILL, containerd.WithKillAll); err != nil {
|
|
||||||
log.Printf("error force killing container task: %s", err)
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
wg.Wait()
|
|
||||||
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
func removeContainer(ctx context.Context, client *containerd.Client, name string) error {
|
|
||||||
|
|
||||||
container, containerErr := client.LoadContainer(ctx, name)
|
|
||||||
|
|
||||||
if containerErr == nil {
|
|
||||||
found := true
|
|
||||||
t, err := container.Task(ctx, nil)
|
|
||||||
if err != nil {
|
|
||||||
if errdefs.IsNotFound(err) {
|
|
||||||
found = false
|
|
||||||
} else {
|
|
||||||
return fmt.Errorf("unable to get task %s: ", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if found {
|
|
||||||
status, _ := t.Status(ctx)
|
|
||||||
fmt.Printf("Status of %s is: %s\n", name, status.Status)
|
|
||||||
|
|
||||||
log.Printf("Need to kill %s\n", name)
|
|
||||||
err := killTask(ctx, t)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("error killing task %s, %s, %s", container.ID(), name, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
err = container.Delete(ctx, containerd.WithSnapshotCleanup)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("error deleting container %s, %s, %s", container.ID(), name, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
@ -65,8 +65,12 @@ func DaemonReload() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func InstallUnit(name string) error {
|
func InstallUnit(name string) error {
|
||||||
|
tmplName := "./hack/" + name + ".service"
|
||||||
|
tmpl, err := template.ParseFiles(tmplName)
|
||||||
|
|
||||||
tmpl, err := template.ParseFiles("./hack/" + name + ".service")
|
if err != nil {
|
||||||
|
return fmt.Errorf("error loading template %s, error %s", tmplName, err)
|
||||||
|
}
|
||||||
|
|
||||||
wd, _ := os.Getwd()
|
wd, _ := os.Getwd()
|
||||||
var tpl bytes.Buffer
|
var tpl bytes.Buffer
|
||||||
|
Reference in New Issue
Block a user