mirror of
https://github.com/openfaas/faasd.git
synced 2025-06-19 20:46:40 +00:00
Compare commits
12 Commits
Author | SHA1 | Date | |
---|---|---|---|
b20e5614c7 | |||
40829bbf88 | |||
87f49b0289 | |||
b817479828 | |||
faae82aa1c | |||
cddc10acbe | |||
1c8e8bb615 | |||
6e537d1fde | |||
c314af4f98 | |||
4189cfe52c | |||
9e2f571cf7 | |||
93825e8354 |
78
README.md
78
README.md
@ -7,7 +7,7 @@ faasd is built for everyone else, for those who have no desire to manage expensi
|
|||||||
[](https://www.openfaas.com)
|
[](https://www.openfaas.com)
|
||||||

|

|
||||||
|
|
||||||
faasd is [OpenFaaS](https://github.com/openfaas/) reimagined, but without the cost and complexity of Kubernetes. It runs on a single host with very modest requirements, making it very fast and easy to manage. Under the hood it uses [containerd](https://containerd.io/) and [Container Networking Interface (CNI)](https://github.com/containernetworking/cni) along with the same core OpenFaaS components from the main project.
|
faasd is [OpenFaaS](https://github.com/openfaas/) reimagined, but without the cost and complexity of Kubernetes. It runs on a single host with very modest requirements, making it fast and easy to manage. Under the hood it uses [containerd](https://containerd.io/) and [Container Networking Interface (CNI)](https://github.com/containernetworking/cni) along with the same core OpenFaaS components from the main project.
|
||||||
|
|
||||||
## When should you use faasd over OpenFaaS on Kubernetes?
|
## When should you use faasd over OpenFaaS on Kubernetes?
|
||||||
|
|
||||||
@ -56,6 +56,8 @@ Automate everything within < 60 seconds and get a public URL and IP address back
|
|||||||
|
|
||||||
* [Provision faasd on DigitalOcean with built-in TLS support](docs/bootstrap/digitalocean-terraform/README.md)
|
* [Provision faasd on DigitalOcean with built-in TLS support](docs/bootstrap/digitalocean-terraform/README.md)
|
||||||
|
|
||||||
|
## Operational concerns
|
||||||
|
|
||||||
### A note on private repos / registries
|
### A note on private repos / registries
|
||||||
|
|
||||||
To use private image repos, `~/.docker/config.json` needs to be copied to `/var/lib/faasd/.docker/config.json`.
|
To use private image repos, `~/.docker/config.json` needs to be copied to `/var/lib/faasd/.docker/config.json`.
|
||||||
@ -87,6 +89,65 @@ journalctl -t openfaas-fn:figlet -f &
|
|||||||
echo logs | faas-cli invoke figlet
|
echo logs | faas-cli invoke figlet
|
||||||
```
|
```
|
||||||
|
|
||||||
|
### Logs for the core services
|
||||||
|
|
||||||
|
Core services as defined in the docker-compose.yaml file are deployed as containers by faasd.
|
||||||
|
|
||||||
|
View the logs for a component by giving its NAME:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
journalctl -t default:NAME
|
||||||
|
|
||||||
|
journalctl -t default:gateway
|
||||||
|
|
||||||
|
journalctl -t default:queue-worker
|
||||||
|
```
|
||||||
|
|
||||||
|
You can also use `-f` to follow the logs, or `--lines` to tail a number of lines, or `--since` to give a timeframe.
|
||||||
|
|
||||||
|
### Exposing core services
|
||||||
|
|
||||||
|
The OpenFaaS stack is made up of several core services including NATS and Prometheus. You can expose these through the `docker-compose.yaml` file located at `/var/lib/faasd`.
|
||||||
|
|
||||||
|
Expose the gateway to all adapters:
|
||||||
|
|
||||||
|
```yaml
|
||||||
|
gateway:
|
||||||
|
ports:
|
||||||
|
- "8080:8080"
|
||||||
|
```
|
||||||
|
|
||||||
|
Expose Prometheus only to 127.0.0.1:
|
||||||
|
|
||||||
|
```yaml
|
||||||
|
prometheus:
|
||||||
|
ports:
|
||||||
|
- "127.0.0.1:9090:9090"
|
||||||
|
```
|
||||||
|
|
||||||
|
### Upgrading faasd
|
||||||
|
|
||||||
|
To upgrade `faasd` either re-create your VM using Terraform, or simply replace the faasd binary with a newer one.
|
||||||
|
|
||||||
|
```bash
|
||||||
|
systemctl stop faasd-provider
|
||||||
|
systemctl stop faasd
|
||||||
|
|
||||||
|
# Replace /usr/local/bin/faasd with the desired release
|
||||||
|
|
||||||
|
# Replace /var/lib/faasd/docker-compose.yaml with the matching version for
|
||||||
|
# that release.
|
||||||
|
# Remember to keep any custom patches you make such as exposing additional
|
||||||
|
# ports, or updating timeout values
|
||||||
|
|
||||||
|
systemctl start faasd
|
||||||
|
systemctl start faasd-provider
|
||||||
|
```
|
||||||
|
|
||||||
|
You could also perform this task over SSH, or use a configuration management tool.
|
||||||
|
|
||||||
|
> Note: if you are using Caddy or Let's Encrypt for free SSL certificates, that you may hit rate-limits for generating new certificates if you do this too often within a given week.
|
||||||
|
|
||||||
## What does faasd deploy?
|
## What does faasd deploy?
|
||||||
|
|
||||||
* faasd - itself, and its [faas-provider](https://github.com/openfaas/faas-provider) for containerd - CRUD for functions and services, implements the OpenFaaS REST API
|
* faasd - itself, and its [faas-provider](https://github.com/openfaas/faas-provider) for containerd - CRUD for functions and services, implements the OpenFaaS REST API
|
||||||
@ -119,7 +180,15 @@ For community functions see `faas-cli store --help`
|
|||||||
|
|
||||||
For templates built by the community see: `faas-cli template store list`, you can also use the `dockerfile` template if you just want to migrate an existing service without the benefits of using a template.
|
For templates built by the community see: `faas-cli template store list`, you can also use the `dockerfile` template if you just want to migrate an existing service without the benefits of using a template.
|
||||||
|
|
||||||
### Workshop
|
### Training and courses
|
||||||
|
|
||||||
|
#### LinuxFoundation training course
|
||||||
|
|
||||||
|
The founder of faasd and OpenFaaS has written a training course for the LinuxFoundation which also covers how to use OpenFaaS on Kubernetes. Much of the same concepts can be applied to faasd, and the course is free:
|
||||||
|
|
||||||
|
* [Introduction to Serverless on Kubernetes](https://www.edx.org/course/introduction-to-serverless-on-kubernetes)
|
||||||
|
|
||||||
|
#### Community workshop
|
||||||
|
|
||||||
[The OpenFaaS workshop](https://github.com/openfaas/workshop/) is a set of 12 self-paced labs and provides a great starting point for learning the features of openfaas. Not all features will be available or usable with faasd.
|
[The OpenFaaS workshop](https://github.com/openfaas/workshop/) is a set of 12 self-paced labs and provides a great starting point for learning the features of openfaas. Not all features will be available or usable with faasd.
|
||||||
|
|
||||||
@ -184,4 +253,9 @@ Other operations are pending development in the provider such as:
|
|||||||
* [x] Configure `basic_auth` to protect the OpenFaaS gateway and faasd-provider HTTP API
|
* [x] Configure `basic_auth` to protect the OpenFaaS gateway and faasd-provider HTTP API
|
||||||
* [x] Setup custom working directory for faasd `/var/lib/faasd/`
|
* [x] Setup custom working directory for faasd `/var/lib/faasd/`
|
||||||
* [x] Use CNI to create network namespaces and adapters
|
* [x] Use CNI to create network namespaces and adapters
|
||||||
|
* [x] Optionally expose core services from the docker-compose.yaml file, locally or to all adapters.
|
||||||
|
|
||||||
|
WIP:
|
||||||
|
|
||||||
|
* [ ] Annotation support (PR ready)
|
||||||
|
* [ ] Hard memory limits for functions (PR ready)
|
||||||
|
54
cmd/up.go
54
cmd/up.go
@ -7,7 +7,6 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
"path"
|
"path"
|
||||||
"strings"
|
|
||||||
"sync"
|
"sync"
|
||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
@ -72,20 +71,15 @@ func runUp(cmd *cobra.Command, _ []string) error {
|
|||||||
log.Printf("Supervisor created in: %s\n", time.Since(start).String())
|
log.Printf("Supervisor created in: %s\n", time.Since(start).String())
|
||||||
|
|
||||||
start = time.Now()
|
start = time.Now()
|
||||||
|
if err := supervisor.Start(services); err != nil {
|
||||||
err = supervisor.Start(services)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
defer supervisor.Close()
|
defer supervisor.Close()
|
||||||
|
|
||||||
log.Printf("Supervisor init done in: %s\n", time.Since(start).String())
|
log.Printf("Supervisor init done in: %s\n", time.Since(start).String())
|
||||||
|
|
||||||
shutdownTimeout := time.Second * 1
|
shutdownTimeout := time.Second * 1
|
||||||
timeout := time.Second * 60
|
timeout := time.Second * 60
|
||||||
proxyDoneCh := make(chan bool)
|
|
||||||
|
|
||||||
wg := sync.WaitGroup{}
|
wg := sync.WaitGroup{}
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
@ -102,38 +96,38 @@ func runUp(cmd *cobra.Command, _ []string) error {
|
|||||||
fmt.Println(err)
|
fmt.Println(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close proxy
|
// TODO: close proxies
|
||||||
proxyDoneCh <- true
|
|
||||||
time.AfterFunc(shutdownTimeout, func() {
|
time.AfterFunc(shutdownTimeout, func() {
|
||||||
wg.Done()
|
wg.Done()
|
||||||
})
|
})
|
||||||
}()
|
}()
|
||||||
|
|
||||||
gatewayURLChan := make(chan string, 1)
|
localResolver := pkg.NewLocalResolver(path.Join(cfg.workingDir, "hosts"))
|
||||||
proxyPort := 8080
|
go localResolver.Start()
|
||||||
proxy := pkg.NewProxy(proxyPort, timeout)
|
|
||||||
go proxy.Start(gatewayURLChan, proxyDoneCh)
|
|
||||||
|
|
||||||
go func() {
|
proxies := map[uint32]*pkg.Proxy{}
|
||||||
time.Sleep(3 * time.Second)
|
for _, svc := range services {
|
||||||
|
for _, port := range svc.Ports {
|
||||||
|
|
||||||
fileData, fileErr := ioutil.ReadFile(path.Join(cfg.workingDir, "hosts"))
|
listenPort := port.Port
|
||||||
if fileErr != nil {
|
if _, ok := proxies[listenPort]; ok {
|
||||||
log.Println(fileErr)
|
return fmt.Errorf("port %d already allocated", listenPort)
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
host := ""
|
|
||||||
lines := strings.Split(string(fileData), "\n")
|
|
||||||
for _, line := range lines {
|
|
||||||
if strings.Index(line, "gateway") > -1 {
|
|
||||||
host = line[:strings.Index(line, "\t")]
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
hostIP := "0.0.0.0"
|
||||||
|
if len(port.HostIP) > 0 {
|
||||||
|
hostIP = port.HostIP
|
||||||
|
}
|
||||||
|
|
||||||
|
upstream := fmt.Sprintf("%s:%d", svc.Name, port.TargetPort)
|
||||||
|
proxies[listenPort] = pkg.NewProxy(upstream, listenPort, hostIP, timeout, localResolver)
|
||||||
}
|
}
|
||||||
log.Printf("[up] Sending %s to proxy\n", host)
|
}
|
||||||
gatewayURLChan <- host + ":8080"
|
|
||||||
close(gatewayURLChan)
|
// TODO: track proxies for later cancellation when receiving sigint/term
|
||||||
}()
|
for _, v := range proxies {
|
||||||
|
go v.Start()
|
||||||
|
}
|
||||||
|
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
return nil
|
return nil
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
version: "3.7"
|
version: "3.7"
|
||||||
services:
|
services:
|
||||||
basic-auth-plugin:
|
basic-auth-plugin:
|
||||||
image: "docker.io/openfaas/basic-auth-plugin:0.18.17${ARCH_SUFFIX}"
|
image: "docker.io/openfaas/basic-auth-plugin:0.18.18${ARCH_SUFFIX}"
|
||||||
environment:
|
environment:
|
||||||
- port=8080
|
- port=8080
|
||||||
- secret_mount_path=/run/secrets
|
- secret_mount_path=/run/secrets
|
||||||
@ -26,6 +26,8 @@ services:
|
|||||||
- "8222"
|
- "8222"
|
||||||
- "--store=memory"
|
- "--store=memory"
|
||||||
- "--cluster_id=faas-cluster"
|
- "--cluster_id=faas-cluster"
|
||||||
|
# ports:
|
||||||
|
# - "127.0.0.1:8222:8222"
|
||||||
|
|
||||||
prometheus:
|
prometheus:
|
||||||
image: docker.io/prom/prometheus:v2.14.0
|
image: docker.io/prom/prometheus:v2.14.0
|
||||||
@ -35,9 +37,11 @@ services:
|
|||||||
target: /etc/prometheus/prometheus.yml
|
target: /etc/prometheus/prometheus.yml
|
||||||
cap_add:
|
cap_add:
|
||||||
- CAP_NET_RAW
|
- CAP_NET_RAW
|
||||||
|
ports:
|
||||||
|
- "127.0.0.1:9090:9090"
|
||||||
|
|
||||||
gateway:
|
gateway:
|
||||||
image: "docker.io/openfaas/gateway:0.18.17${ARCH_SUFFIX}"
|
image: "docker.io/openfaas/gateway:0.18.18${ARCH_SUFFIX}"
|
||||||
environment:
|
environment:
|
||||||
- basic_auth=true
|
- basic_auth=true
|
||||||
- functions_provider_url=http://faasd-provider:8081/
|
- functions_provider_url=http://faasd-provider:8081/
|
||||||
@ -65,6 +69,8 @@ services:
|
|||||||
- basic-auth-plugin
|
- basic-auth-plugin
|
||||||
- nats
|
- nats
|
||||||
- prometheus
|
- prometheus
|
||||||
|
ports:
|
||||||
|
- "8080:8080"
|
||||||
|
|
||||||
queue-worker:
|
queue-worker:
|
||||||
image: docker.io/openfaas/queue-worker:0.11.2
|
image: docker.io/openfaas/queue-worker:0.11.2
|
||||||
|
104
pkg/local_resolver.go
Normal file
104
pkg/local_resolver.go
Normal file
@ -0,0 +1,104 @@
|
|||||||
|
package pkg
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io/ioutil"
|
||||||
|
"log"
|
||||||
|
"os"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// LocalResolver provides hostname to IP look-up for faasd core services
|
||||||
|
type LocalResolver struct {
|
||||||
|
Path string
|
||||||
|
Map map[string]string
|
||||||
|
Mutex *sync.RWMutex
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewLocalResolver creates a new resolver for reading from a hosts file
|
||||||
|
func NewLocalResolver(path string) Resolver {
|
||||||
|
return &LocalResolver{
|
||||||
|
Path: path,
|
||||||
|
Mutex: &sync.RWMutex{},
|
||||||
|
Map: make(map[string]string),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start polling the disk for the hosts file in Path
|
||||||
|
func (l *LocalResolver) Start() {
|
||||||
|
var lastStat os.FileInfo
|
||||||
|
|
||||||
|
for {
|
||||||
|
rebuild := false
|
||||||
|
if info, err := os.Stat(l.Path); err == nil {
|
||||||
|
if lastStat == nil {
|
||||||
|
rebuild = true
|
||||||
|
} else {
|
||||||
|
if !lastStat.ModTime().Equal(info.ModTime()) {
|
||||||
|
rebuild = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
lastStat = info
|
||||||
|
}
|
||||||
|
|
||||||
|
if rebuild {
|
||||||
|
log.Printf("Resolver rebuilding map")
|
||||||
|
l.rebuild()
|
||||||
|
}
|
||||||
|
time.Sleep(time.Second * 3)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *LocalResolver) rebuild() {
|
||||||
|
l.Mutex.Lock()
|
||||||
|
defer l.Mutex.Unlock()
|
||||||
|
|
||||||
|
fileData, fileErr := ioutil.ReadFile(l.Path)
|
||||||
|
if fileErr != nil {
|
||||||
|
log.Printf("resolver rebuild error: %s", fileErr.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
lines := strings.Split(string(fileData), "\n")
|
||||||
|
|
||||||
|
for _, line := range lines {
|
||||||
|
index := strings.Index(line, "\t")
|
||||||
|
|
||||||
|
if len(line) > 0 && index > -1 {
|
||||||
|
ip := line[:index]
|
||||||
|
host := line[index+1:]
|
||||||
|
log.Printf("Resolver: %q=%q", host, ip)
|
||||||
|
l.Map[host] = ip
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get resolves a hostname to an IP, or timesout after the duration has passed
|
||||||
|
func (l *LocalResolver) Get(upstream string, got chan<- string, timeout time.Duration) {
|
||||||
|
start := time.Now()
|
||||||
|
for {
|
||||||
|
if val := l.get(upstream); len(val) > 0 {
|
||||||
|
got <- val
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
if time.Now().After(start.Add(timeout)) {
|
||||||
|
log.Printf("Timed out after %s getting host %q", timeout.String(), upstream)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
time.Sleep(time.Millisecond * 250)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *LocalResolver) get(upstream string) string {
|
||||||
|
l.Mutex.RLock()
|
||||||
|
defer l.Mutex.RUnlock()
|
||||||
|
|
||||||
|
if val, ok := l.Map[upstream]; ok {
|
||||||
|
return val
|
||||||
|
}
|
||||||
|
|
||||||
|
return ""
|
||||||
|
}
|
@ -69,6 +69,7 @@ func deploy(ctx context.Context, req types.FunctionDeployment, client *container
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
imgRef := reference.TagNameOnly(r).String()
|
imgRef := reference.TagNameOnly(r).String()
|
||||||
|
|
||||||
snapshotter := ""
|
snapshotter := ""
|
||||||
@ -98,6 +99,11 @@ func deploy(ctx context.Context, req types.FunctionDeployment, client *container
|
|||||||
|
|
||||||
name := req.Service
|
name := req.Service
|
||||||
|
|
||||||
|
labels := map[string]string{}
|
||||||
|
if req.Labels != nil {
|
||||||
|
labels = *req.Labels
|
||||||
|
}
|
||||||
|
|
||||||
container, err := client.NewContainer(
|
container, err := client.NewContainer(
|
||||||
ctx,
|
ctx,
|
||||||
name,
|
name,
|
||||||
@ -108,7 +114,7 @@ func deploy(ctx context.Context, req types.FunctionDeployment, client *container
|
|||||||
oci.WithCapabilities([]string{"CAP_NET_RAW"}),
|
oci.WithCapabilities([]string{"CAP_NET_RAW"}),
|
||||||
oci.WithMounts(mounts),
|
oci.WithMounts(mounts),
|
||||||
oci.WithEnv(envs)),
|
oci.WithEnv(envs)),
|
||||||
containerd.WithContainerLabels(*req.Labels),
|
containerd.WithContainerLabels(labels),
|
||||||
)
|
)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -122,7 +128,6 @@ func deploy(ctx context.Context, req types.FunctionDeployment, client *container
|
|||||||
func createTask(ctx context.Context, client *containerd.Client, container containerd.Container, cni gocni.CNI) error {
|
func createTask(ctx context.Context, client *containerd.Client, container containerd.Container, cni gocni.CNI) error {
|
||||||
|
|
||||||
name := container.ID()
|
name := container.ID()
|
||||||
// task, taskErr := container.NewTask(ctx, cio.NewCreator(cio.WithStdio))
|
|
||||||
|
|
||||||
task, taskErr := container.NewTask(ctx, cio.BinaryIO("/usr/local/bin/faasd", nil))
|
task, taskErr := container.NewTask(ctx, cio.BinaryIO("/usr/local/bin/faasd", nil))
|
||||||
|
|
||||||
|
@ -68,6 +68,7 @@ func GetFunction(client *containerd.Client, name string) (Function, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return Function{}, fmt.Errorf("unable to get task status for container: %s %s", name, err)
|
return Function{}, fmt.Errorf("unable to get task status for container: %s %s", name, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if svc.Status == "running" {
|
if svc.Status == "running" {
|
||||||
replicas = 1
|
replicas = 1
|
||||||
f.pid = task.Pid()
|
f.pid = task.Pid()
|
||||||
@ -85,7 +86,7 @@ func GetFunction(client *containerd.Client, name string) (Function, error) {
|
|||||||
|
|
||||||
f.replicas = replicas
|
f.replicas = replicas
|
||||||
return f, nil
|
return f, nil
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return Function{}, fmt.Errorf("unable to find function: %s, error %s", name, err)
|
return Function{}, fmt.Errorf("unable to find function: %s, error %s", name, err)
|
||||||
}
|
}
|
||||||
|
@ -11,6 +11,7 @@ import (
|
|||||||
"github.com/containerd/containerd"
|
"github.com/containerd/containerd"
|
||||||
"github.com/containerd/containerd/namespaces"
|
"github.com/containerd/containerd/namespaces"
|
||||||
gocni "github.com/containerd/go-cni"
|
gocni "github.com/containerd/go-cni"
|
||||||
|
|
||||||
"github.com/openfaas/faas-provider/types"
|
"github.com/openfaas/faas-provider/types"
|
||||||
faasd "github.com/openfaas/faasd/pkg"
|
faasd "github.com/openfaas/faasd/pkg"
|
||||||
)
|
)
|
||||||
@ -58,46 +59,71 @@ func MakeReplicaUpdateHandler(client *containerd.Client, cni gocni.CNI) func(w h
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
taskExists := true
|
var taskExists bool
|
||||||
|
var taskStatus *containerd.Status
|
||||||
|
|
||||||
task, taskErr := ctr.Task(ctx, nil)
|
task, taskErr := ctr.Task(ctx, nil)
|
||||||
if taskErr != nil {
|
if taskErr != nil {
|
||||||
msg := fmt.Sprintf("cannot load task for service %s, error: %s", name, taskErr)
|
msg := fmt.Sprintf("cannot load task for service %s, error: %s", name, taskErr)
|
||||||
log.Printf("[Scale] %s\n", msg)
|
log.Printf("[Scale] %s\n", msg)
|
||||||
taskExists = false
|
taskExists = false
|
||||||
|
} else {
|
||||||
|
taskExists = true
|
||||||
|
status, statusErr := task.Status(ctx)
|
||||||
|
if statusErr != nil {
|
||||||
|
msg := fmt.Sprintf("cannot load task status for %s, error: %s", name, statusErr)
|
||||||
|
log.Printf("[Scale] %s\n", msg)
|
||||||
|
http.Error(w, msg, http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
} else {
|
||||||
|
taskStatus = &status
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if req.Replicas > 0 {
|
createNewTask := false
|
||||||
if taskExists {
|
|
||||||
if status, statusErr := task.Status(ctx); statusErr == nil {
|
// Scale to zero
|
||||||
if status.Status == containerd.Paused {
|
if req.Replicas == 0 {
|
||||||
if resumeErr := task.Resume(ctx); resumeErr != nil {
|
// If a task is running, pause it
|
||||||
log.Printf("[Scale] error resuming task %s, error: %s\n", name, resumeErr)
|
if taskExists && taskStatus.Status == containerd.Running {
|
||||||
http.Error(w, resumeErr.Error(), http.StatusBadRequest)
|
if pauseErr := task.Pause(ctx); pauseErr != nil {
|
||||||
}
|
wrappedPauseErr := fmt.Errorf("error pausing task %s, error: %s", name, pauseErr)
|
||||||
}
|
log.Printf("[Scale] %s\n", wrappedPauseErr.Error())
|
||||||
}
|
http.Error(w, wrappedPauseErr.Error(), http.StatusNotFound)
|
||||||
} else {
|
|
||||||
deployErr := createTask(ctx, client, ctr, cni)
|
|
||||||
if deployErr != nil {
|
|
||||||
log.Printf("[Scale] error deploying %s, error: %s\n", name, deployErr)
|
|
||||||
http.Error(w, deployErr.Error(), http.StatusBadRequest)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
return
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if taskExists {
|
|
||||||
if status, statusErr := task.Status(ctx); statusErr == nil {
|
|
||||||
if status.Status == containerd.Running {
|
|
||||||
if pauseErr := task.Pause(ctx); pauseErr != nil {
|
|
||||||
log.Printf("[Scale] error pausing task %s, error: %s\n", name, pauseErr)
|
|
||||||
http.Error(w, pauseErr.Error(), http.StatusBadRequest)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
if taskExists {
|
||||||
|
if taskStatus != nil {
|
||||||
|
if taskStatus.Status == containerd.Paused {
|
||||||
|
if resumeErr := task.Resume(ctx); resumeErr != nil {
|
||||||
|
log.Printf("[Scale] error resuming task %s, error: %s\n", name, resumeErr)
|
||||||
|
http.Error(w, resumeErr.Error(), http.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
} else if taskStatus.Status == containerd.Stopped {
|
||||||
|
// Stopped tasks cannot be restarted, must be removed, and created again
|
||||||
|
if _, delErr := task.Delete(ctx); delErr != nil {
|
||||||
|
log.Printf("[Scale] error deleting stopped task %s, error: %s\n", name, delErr)
|
||||||
|
http.Error(w, delErr.Error(), http.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
createNewTask = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
createNewTask = true
|
||||||
|
}
|
||||||
|
|
||||||
|
if createNewTask {
|
||||||
|
deployErr := createTask(ctx, client, ctr, cni)
|
||||||
|
if deployErr != nil {
|
||||||
|
log.Printf("[Scale] error deploying %s, error: %s\n", name, deployErr)
|
||||||
|
http.Error(w, deployErr.Error(), http.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
77
pkg/proxy.go
77
pkg/proxy.go
@ -6,74 +6,91 @@ import (
|
|||||||
"log"
|
"log"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
// NewProxy creates a HTTP proxy to expose the gateway container
|
// NewProxy creates a HTTP proxy to expose a host
|
||||||
// from OpenFaaS to the host
|
func NewProxy(upstream string, listenPort uint32, hostIP string, timeout time.Duration, resolver Resolver) *Proxy {
|
||||||
func NewProxy(port int, timeout time.Duration) *Proxy {
|
|
||||||
|
|
||||||
return &Proxy{
|
return &Proxy{
|
||||||
Port: port,
|
Upstream: upstream,
|
||||||
Timeout: timeout,
|
Port: listenPort,
|
||||||
|
HostIP: hostIP,
|
||||||
|
Timeout: timeout,
|
||||||
|
Resolver: resolver,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Proxy for exposing a private container
|
// Proxy for exposing a private container
|
||||||
type Proxy struct {
|
type Proxy struct {
|
||||||
Timeout time.Duration
|
Timeout time.Duration
|
||||||
Port int
|
|
||||||
}
|
|
||||||
|
|
||||||
type proxyState struct {
|
// Port on which to listen to traffic
|
||||||
Host string
|
Port uint32
|
||||||
|
|
||||||
|
// Upstream is where to send traffic when received
|
||||||
|
Upstream string
|
||||||
|
|
||||||
|
// The IP to use to bind locally
|
||||||
|
HostIP string
|
||||||
|
|
||||||
|
Resolver Resolver
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start listening and forwarding HTTP to the host
|
// Start listening and forwarding HTTP to the host
|
||||||
func (p *Proxy) Start(gatewayChan chan string, done chan bool) error {
|
func (p *Proxy) Start() error {
|
||||||
tcp := p.Port
|
|
||||||
|
|
||||||
http.DefaultClient.CheckRedirect = func(req *http.Request, via []*http.Request) error {
|
http.DefaultClient.CheckRedirect = func(req *http.Request, via []*http.Request) error {
|
||||||
return http.ErrUseLastResponse
|
return http.ErrUseLastResponse
|
||||||
}
|
}
|
||||||
ps := proxyState{
|
upstreamHost, upstreamPort, err := getUpstream(p.Upstream, p.Port)
|
||||||
Host: "",
|
if err != nil {
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
ps.Host = <-gatewayChan
|
log.Printf("Looking up IP for: %q", upstreamHost)
|
||||||
|
got := make(chan string, 1)
|
||||||
|
|
||||||
log.Printf("Starting faasd proxy on %d\n", tcp)
|
go p.Resolver.Get(upstreamHost, got, time.Second*5)
|
||||||
|
|
||||||
fmt.Printf("Gateway: %s\n", ps.Host)
|
ipAddress := <-got
|
||||||
|
close(got)
|
||||||
|
|
||||||
l, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", tcp))
|
upstreamAddr := fmt.Sprintf("%s:%d", ipAddress, upstreamPort)
|
||||||
|
|
||||||
|
localBind := fmt.Sprintf("%s:%d", p.HostIP, p.Port)
|
||||||
|
log.Printf("Proxy from: %s, to: %s (%s)\n", localBind, p.Upstream, ipAddress)
|
||||||
|
|
||||||
|
l, err := net.Listen("tcp", localBind)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("Error: %s", err.Error())
|
log.Printf("Error: %s", err.Error())
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
defer l.Close()
|
defer l.Close()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
// Wait for a connection.
|
// Wait for a connection.
|
||||||
conn, err := l.Accept()
|
conn, err := l.Accept()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
acceptErr := fmt.Errorf("Unable to accept on %d, error: %s", tcp, err.Error())
|
acceptErr := fmt.Errorf("Unable to accept on %d, error: %s",
|
||||||
|
p.Port,
|
||||||
|
err.Error())
|
||||||
log.Printf("%s", acceptErr.Error())
|
log.Printf("%s", acceptErr.Error())
|
||||||
return acceptErr
|
return acceptErr
|
||||||
}
|
}
|
||||||
|
|
||||||
upstream, err := net.Dial("tcp", fmt.Sprintf("%s", ps.Host))
|
upstream, err := net.Dial("tcp", upstreamAddr)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("unable to dial to %s, error: %s", ps.Host, err.Error())
|
log.Printf("unable to dial to %s, error: %s", upstreamAddr, err.Error())
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
go pipe(conn, upstream)
|
go pipe(conn, upstream)
|
||||||
go pipe(upstream, conn)
|
go pipe(upstream, conn)
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -81,3 +98,19 @@ func pipe(from net.Conn, to net.Conn) {
|
|||||||
defer from.Close()
|
defer from.Close()
|
||||||
io.Copy(from, to)
|
io.Copy(from, to)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getUpstream(val string, defaultPort uint32) (string, uint32, error) {
|
||||||
|
upstreamHostname := val
|
||||||
|
upstreamPort := defaultPort
|
||||||
|
|
||||||
|
if in := strings.Index(val, ":"); in > -1 {
|
||||||
|
upstreamHostname = val[:in]
|
||||||
|
port, err := strconv.ParseInt(val[in+1:], 10, 32)
|
||||||
|
if err != nil {
|
||||||
|
return "", defaultPort, err
|
||||||
|
}
|
||||||
|
upstreamPort = uint32(port)
|
||||||
|
}
|
||||||
|
|
||||||
|
return upstreamHostname, upstreamPort, nil
|
||||||
|
}
|
||||||
|
@ -16,7 +16,7 @@ func Test_Proxy_ToPrivateServer(t *testing.T) {
|
|||||||
|
|
||||||
wantBodyText := "OK"
|
wantBodyText := "OK"
|
||||||
wantBody := []byte(wantBodyText)
|
wantBody := []byte(wantBodyText)
|
||||||
upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
upstreamSvr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
|
||||||
if r.Body != nil {
|
if r.Body != nil {
|
||||||
defer r.Body.Close()
|
defer r.Body.Close()
|
||||||
@ -27,17 +27,19 @@ func Test_Proxy_ToPrivateServer(t *testing.T) {
|
|||||||
|
|
||||||
}))
|
}))
|
||||||
|
|
||||||
defer upstream.Close()
|
defer upstreamSvr.Close()
|
||||||
port := 8080
|
port := 8080
|
||||||
proxy := NewProxy(port, time.Second*1)
|
u, _ := url.Parse(upstreamSvr.URL)
|
||||||
|
log.Println("Host", u.Host)
|
||||||
|
|
||||||
|
upstreamAddr := u.Host
|
||||||
|
proxy := NewProxy(upstreamAddr, 8080, "127.0.0.1", time.Second*1, &mockResolver{})
|
||||||
|
|
||||||
gwChan := make(chan string, 1)
|
gwChan := make(chan string, 1)
|
||||||
doneCh := make(chan bool)
|
doneCh := make(chan bool)
|
||||||
|
|
||||||
go proxy.Start(gwChan, doneCh)
|
go proxy.Start()
|
||||||
|
|
||||||
u, _ := url.Parse(upstream.URL)
|
|
||||||
log.Println("Host", u.Host)
|
|
||||||
wg := sync.WaitGroup{}
|
wg := sync.WaitGroup{}
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
@ -71,3 +73,14 @@ func Test_Proxy_ToPrivateServer(t *testing.T) {
|
|||||||
doneCh <- true
|
doneCh <- true
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type mockResolver struct {
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockResolver) Start() {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockResolver) Get(upstream string, got chan<- string, timeout time.Duration) {
|
||||||
|
got <- upstream
|
||||||
|
}
|
||||||
|
12
pkg/resolver.go
Normal file
12
pkg/resolver.go
Normal file
@ -0,0 +1,12 @@
|
|||||||
|
package pkg
|
||||||
|
|
||||||
|
import "time"
|
||||||
|
|
||||||
|
// Resolver resolves an upstream IP address for a given upstream host
|
||||||
|
type Resolver interface {
|
||||||
|
// Start any polling or connections required to resolve
|
||||||
|
Start()
|
||||||
|
|
||||||
|
// Get an IP address using an asynchronous operation
|
||||||
|
Get(upstream string, got chan<- string, timeout time.Duration)
|
||||||
|
}
|
@ -41,6 +41,13 @@ type Service struct {
|
|||||||
Caps []string
|
Caps []string
|
||||||
Args []string
|
Args []string
|
||||||
DependsOn []string
|
DependsOn []string
|
||||||
|
Ports []ServicePort
|
||||||
|
}
|
||||||
|
|
||||||
|
type ServicePort struct {
|
||||||
|
TargetPort uint32
|
||||||
|
Port uint32
|
||||||
|
HostIP string
|
||||||
}
|
}
|
||||||
|
|
||||||
type Mount struct {
|
type Mount struct {
|
||||||
@ -154,7 +161,7 @@ func (s *Supervisor) Start(svcs []Service) error {
|
|||||||
Options: []string{"rbind", "ro"},
|
Options: []string{"rbind", "ro"},
|
||||||
})
|
})
|
||||||
|
|
||||||
newContainer, containerCreateErr := s.client.NewContainer(
|
newContainer, err := s.client.NewContainer(
|
||||||
ctx,
|
ctx,
|
||||||
svc.Name,
|
svc.Name,
|
||||||
containerd.WithImage(image),
|
containerd.WithImage(image),
|
||||||
@ -166,14 +173,14 @@ func (s *Supervisor) Start(svcs []Service) error {
|
|||||||
oci.WithEnv(svc.Env)),
|
oci.WithEnv(svc.Env)),
|
||||||
)
|
)
|
||||||
|
|
||||||
if containerCreateErr != nil {
|
if err != nil {
|
||||||
log.Printf("Error creating container: %s\n", containerCreateErr)
|
log.Printf("Error creating container: %s\n", err)
|
||||||
return containerCreateErr
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Printf("Created container: %s\n", newContainer.ID())
|
log.Printf("Created container: %s\n", newContainer.ID())
|
||||||
|
|
||||||
task, err := newContainer.NewTask(ctx, cio.NewCreator(cio.WithStdio))
|
task, err := newContainer.NewTask(ctx, cio.BinaryIO("/usr/local/bin/faasd", nil))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("Error creating task: %s\n", err)
|
log.Printf("Error creating task: %s\n", err)
|
||||||
return err
|
return err
|
||||||
@ -181,13 +188,14 @@ func (s *Supervisor) Start(svcs []Service) error {
|
|||||||
|
|
||||||
labels := map[string]string{}
|
labels := map[string]string{}
|
||||||
network, err := cninetwork.CreateCNINetwork(ctx, s.cni, task, labels)
|
network, err := cninetwork.CreateCNINetwork(ctx, s.cni, task, labels)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
log.Printf("Error creating CNI for %s: %s", svc.Name, err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
ip, err := cninetwork.GetIPAddress(network, task)
|
ip, err := cninetwork.GetIPAddress(network, task)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
log.Printf("Error getting IP for %s: %s", svc.Name, err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -297,12 +305,26 @@ func ParseCompose(config *compose.Config) ([]Service, error) {
|
|||||||
Env: env,
|
Env: env,
|
||||||
Mounts: mounts,
|
Mounts: mounts,
|
||||||
DependsOn: s.DependsOn,
|
DependsOn: s.DependsOn,
|
||||||
|
Ports: convertPorts(s.Ports),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return services, nil
|
return services, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func convertPorts(ports []compose.ServicePortConfig) []ServicePort {
|
||||||
|
servicePorts := []ServicePort{}
|
||||||
|
for _, p := range ports {
|
||||||
|
servicePorts = append(servicePorts, ServicePort{
|
||||||
|
Port: p.Published,
|
||||||
|
TargetPort: p.Target,
|
||||||
|
HostIP: p.HostIP,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
return servicePorts
|
||||||
|
}
|
||||||
|
|
||||||
// LoadComposeFile is a helper method for loading a docker-compose file
|
// LoadComposeFile is a helper method for loading a docker-compose file
|
||||||
func LoadComposeFile(wd string, file string) (*compose.Config, error) {
|
func LoadComposeFile(wd string, file string) (*compose.Config, error) {
|
||||||
return LoadComposeFileWithArch(wd, file, env.GetClientArch)
|
return LoadComposeFileWithArch(wd, file, env.GetClientArch)
|
||||||
|
6
pkg/testdata/docker-compose.yaml
vendored
6
pkg/testdata/docker-compose.yaml
vendored
@ -26,6 +26,8 @@ services:
|
|||||||
- "8222"
|
- "8222"
|
||||||
- "--store=memory"
|
- "--store=memory"
|
||||||
- "--cluster_id=faas-cluster"
|
- "--cluster_id=faas-cluster"
|
||||||
|
ports:
|
||||||
|
- "127.0.0.1:8222:8222"
|
||||||
|
|
||||||
prometheus:
|
prometheus:
|
||||||
image: docker.io/prom/prometheus:v2.14.0
|
image: docker.io/prom/prometheus:v2.14.0
|
||||||
@ -35,6 +37,8 @@ services:
|
|||||||
target: /etc/prometheus/prometheus.yml
|
target: /etc/prometheus/prometheus.yml
|
||||||
cap_add:
|
cap_add:
|
||||||
- CAP_NET_RAW
|
- CAP_NET_RAW
|
||||||
|
ports:
|
||||||
|
- "127.0.0.1:9090:9090"
|
||||||
|
|
||||||
gateway:
|
gateway:
|
||||||
image: "docker.io/openfaas/gateway:0.18.17${ARCH_SUFFIX}"
|
image: "docker.io/openfaas/gateway:0.18.17${ARCH_SUFFIX}"
|
||||||
@ -65,6 +69,8 @@ services:
|
|||||||
- basic-auth-plugin
|
- basic-auth-plugin
|
||||||
- nats
|
- nats
|
||||||
- prometheus
|
- prometheus
|
||||||
|
ports:
|
||||||
|
- "8080:8080"
|
||||||
|
|
||||||
queue-worker:
|
queue-worker:
|
||||||
image: docker.io/openfaas/queue-worker:0.11.2
|
image: docker.io/openfaas/queue-worker:0.11.2
|
||||||
|
Reference in New Issue
Block a user