Compare commits

..

1 Commits

Author SHA1 Message Date
97c11b3f5d Proxy the gateway using TCP
There appeared to be an issue with logs appearing #98 and #68

@LucasRoesler spent a considerable amount of time looking into
this and concluded that the faas-provider and approach we are
taking to stream logs from journalctl as a process was
working as expected.

The issue appears to have been with the proxy code and its
use of a HTTP connection. Somewhere within the code, a buffer
was holding onto the data before flushing it 20-30 seconds later

This appeared to users as if the logs were not working at all.

Before fixing, the gateway container was tested by exposing
it over an SSH tunnel and inlets tunnel, both worked as
expected. The updates have been tested on multipass with
Ubuntu 18.04 and a binary built locally.

Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alexellis2@gmail.com>
2020-09-08 11:33:11 +01:00
13 changed files with 100 additions and 396 deletions

View File

@ -7,7 +7,7 @@ faasd is built for everyone else, for those who have no desire to manage expensi
[![OpenFaaS](https://img.shields.io/badge/openfaas-serverless-blue.svg)](https://www.openfaas.com) [![OpenFaaS](https://img.shields.io/badge/openfaas-serverless-blue.svg)](https://www.openfaas.com)
![Downloads](https://img.shields.io/github/downloads/openfaas/faasd/total) ![Downloads](https://img.shields.io/github/downloads/openfaas/faasd/total)
faasd is [OpenFaaS](https://github.com/openfaas/) reimagined, but without the cost and complexity of Kubernetes. It runs on a single host with very modest requirements, making it fast and easy to manage. Under the hood it uses [containerd](https://containerd.io/) and [Container Networking Interface (CNI)](https://github.com/containernetworking/cni) along with the same core OpenFaaS components from the main project. faasd is [OpenFaaS](https://github.com/openfaas/) reimagined, but without the cost and complexity of Kubernetes. It runs on a single host with very modest requirements, making it very fast and easy to manage. Under the hood it uses [containerd](https://containerd.io/) and [Container Networking Interface (CNI)](https://github.com/containernetworking/cni) along with the same core OpenFaaS components from the main project.
## When should you use faasd over OpenFaaS on Kubernetes? ## When should you use faasd over OpenFaaS on Kubernetes?
@ -56,8 +56,6 @@ Automate everything within < 60 seconds and get a public URL and IP address back
* [Provision faasd on DigitalOcean with built-in TLS support](docs/bootstrap/digitalocean-terraform/README.md) * [Provision faasd on DigitalOcean with built-in TLS support](docs/bootstrap/digitalocean-terraform/README.md)
## Operational concerns
### A note on private repos / registries ### A note on private repos / registries
To use private image repos, `~/.docker/config.json` needs to be copied to `/var/lib/faasd/.docker/config.json`. To use private image repos, `~/.docker/config.json` needs to be copied to `/var/lib/faasd/.docker/config.json`.
@ -89,65 +87,6 @@ journalctl -t openfaas-fn:figlet -f &
echo logs | faas-cli invoke figlet echo logs | faas-cli invoke figlet
``` ```
### Logs for the core services
Core services as defined in the docker-compose.yaml file are deployed as containers by faasd.
View the logs for a component by giving its NAME:
```bash
journalctl -t default:NAME
journalctl -t default:gateway
journalctl -t default:queue-worker
```
You can also use `-f` to follow the logs, or `--lines` to tail a number of lines, or `--since` to give a timeframe.
### Exposing core services
The OpenFaaS stack is made up of several core services including NATS and Prometheus. You can expose these through the `docker-compose.yaml` file located at `/var/lib/faasd`.
Expose the gateway to all adapters:
```yaml
gateway:
ports:
- "8080:8080"
```
Expose Prometheus only to 127.0.0.1:
```yaml
prometheus:
ports:
- "127.0.0.1:9090:9090"
```
### Upgrading faasd
To upgrade `faasd` either re-create your VM using Terraform, or simply replace the faasd binary with a newer one.
```bash
systemctl stop faasd-provider
systemctl stop faasd
# Replace /usr/local/bin/faasd with the desired release
# Replace /var/lib/faasd/docker-compose.yaml with the matching version for
# that release.
# Remember to keep any custom patches you make such as exposing additional
# ports, or updating timeout values
systemctl start faasd
systemctl start faasd-provider
```
You could also perform this task over SSH, or use a configuration management tool.
> Note: if you are using Caddy or Let's Encrypt for free SSL certificates, that you may hit rate-limits for generating new certificates if you do this too often within a given week.
## What does faasd deploy? ## What does faasd deploy?
* faasd - itself, and its [faas-provider](https://github.com/openfaas/faas-provider) for containerd - CRUD for functions and services, implements the OpenFaaS REST API * faasd - itself, and its [faas-provider](https://github.com/openfaas/faas-provider) for containerd - CRUD for functions and services, implements the OpenFaaS REST API
@ -180,15 +119,7 @@ For community functions see `faas-cli store --help`
For templates built by the community see: `faas-cli template store list`, you can also use the `dockerfile` template if you just want to migrate an existing service without the benefits of using a template. For templates built by the community see: `faas-cli template store list`, you can also use the `dockerfile` template if you just want to migrate an existing service without the benefits of using a template.
### Training and courses ### Workshop
#### LinuxFoundation training course
The founder of faasd and OpenFaaS has written a training course for the LinuxFoundation which also covers how to use OpenFaaS on Kubernetes. Much of the same concepts can be applied to faasd, and the course is free:
* [Introduction to Serverless on Kubernetes](https://www.edx.org/course/introduction-to-serverless-on-kubernetes)
#### Community workshop
[The OpenFaaS workshop](https://github.com/openfaas/workshop/) is a set of 12 self-paced labs and provides a great starting point for learning the features of openfaas. Not all features will be available or usable with faasd. [The OpenFaaS workshop](https://github.com/openfaas/workshop/) is a set of 12 self-paced labs and provides a great starting point for learning the features of openfaas. Not all features will be available or usable with faasd.
@ -253,9 +184,4 @@ Other operations are pending development in the provider such as:
* [x] Configure `basic_auth` to protect the OpenFaaS gateway and faasd-provider HTTP API * [x] Configure `basic_auth` to protect the OpenFaaS gateway and faasd-provider HTTP API
* [x] Setup custom working directory for faasd `/var/lib/faasd/` * [x] Setup custom working directory for faasd `/var/lib/faasd/`
* [x] Use CNI to create network namespaces and adapters * [x] Use CNI to create network namespaces and adapters
* [x] Optionally expose core services from the docker-compose.yaml file, locally or to all adapters.
WIP:
* [ ] Annotation support (PR ready)
* [ ] Hard memory limits for functions (PR ready)

View File

@ -7,6 +7,7 @@ import (
"os" "os"
"os/signal" "os/signal"
"path" "path"
"strings"
"sync" "sync"
"syscall" "syscall"
"time" "time"
@ -71,15 +72,20 @@ func runUp(cmd *cobra.Command, _ []string) error {
log.Printf("Supervisor created in: %s\n", time.Since(start).String()) log.Printf("Supervisor created in: %s\n", time.Since(start).String())
start = time.Now() start = time.Now()
if err := supervisor.Start(services); err != nil {
err = supervisor.Start(services)
if err != nil {
return err return err
} }
defer supervisor.Close() defer supervisor.Close()
log.Printf("Supervisor init done in: %s\n", time.Since(start).String()) log.Printf("Supervisor init done in: %s\n", time.Since(start).String())
shutdownTimeout := time.Second * 1 shutdownTimeout := time.Second * 1
timeout := time.Second * 60 timeout := time.Second * 60
proxyDoneCh := make(chan bool)
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
wg.Add(1) wg.Add(1)
@ -96,38 +102,38 @@ func runUp(cmd *cobra.Command, _ []string) error {
fmt.Println(err) fmt.Println(err)
} }
// TODO: close proxies // Close proxy
proxyDoneCh <- true
time.AfterFunc(shutdownTimeout, func() { time.AfterFunc(shutdownTimeout, func() {
wg.Done() wg.Done()
}) })
}() }()
localResolver := pkg.NewLocalResolver(path.Join(cfg.workingDir, "hosts")) gatewayURLChan := make(chan string, 1)
go localResolver.Start() proxyPort := 8080
proxy := pkg.NewProxy(proxyPort, timeout)
go proxy.Start(gatewayURLChan, proxyDoneCh)
proxies := map[uint32]*pkg.Proxy{} go func() {
for _, svc := range services { time.Sleep(3 * time.Second)
for _, port := range svc.Ports {
listenPort := port.Port fileData, fileErr := ioutil.ReadFile(path.Join(cfg.workingDir, "hosts"))
if _, ok := proxies[listenPort]; ok { if fileErr != nil {
return fmt.Errorf("port %d already allocated", listenPort) log.Println(fileErr)
} return
hostIP := "0.0.0.0"
if len(port.HostIP) > 0 {
hostIP = port.HostIP
}
upstream := fmt.Sprintf("%s:%d", svc.Name, port.TargetPort)
proxies[listenPort] = pkg.NewProxy(upstream, listenPort, hostIP, timeout, localResolver)
} }
}
// TODO: track proxies for later cancellation when receiving sigint/term host := ""
for _, v := range proxies { lines := strings.Split(string(fileData), "\n")
go v.Start() for _, line := range lines {
} if strings.Index(line, "gateway") > -1 {
host = line[:strings.Index(line, "\t")]
}
}
log.Printf("[up] Sending %s to proxy\n", host)
gatewayURLChan <- host + ":8080"
close(gatewayURLChan)
}()
wg.Wait() wg.Wait()
return nil return nil

View File

@ -1,7 +1,7 @@
version: "3.7" version: "3.7"
services: services:
basic-auth-plugin: basic-auth-plugin:
image: "docker.io/openfaas/basic-auth-plugin:0.18.18${ARCH_SUFFIX}" image: "docker.io/openfaas/basic-auth-plugin:0.18.17${ARCH_SUFFIX}"
environment: environment:
- port=8080 - port=8080
- secret_mount_path=/run/secrets - secret_mount_path=/run/secrets
@ -26,8 +26,6 @@ services:
- "8222" - "8222"
- "--store=memory" - "--store=memory"
- "--cluster_id=faas-cluster" - "--cluster_id=faas-cluster"
# ports:
# - "127.0.0.1:8222:8222"
prometheus: prometheus:
image: docker.io/prom/prometheus:v2.14.0 image: docker.io/prom/prometheus:v2.14.0
@ -37,11 +35,9 @@ services:
target: /etc/prometheus/prometheus.yml target: /etc/prometheus/prometheus.yml
cap_add: cap_add:
- CAP_NET_RAW - CAP_NET_RAW
ports:
- "127.0.0.1:9090:9090"
gateway: gateway:
image: "docker.io/openfaas/gateway:0.18.18${ARCH_SUFFIX}" image: "docker.io/openfaas/gateway:0.18.17${ARCH_SUFFIX}"
environment: environment:
- basic_auth=true - basic_auth=true
- functions_provider_url=http://faasd-provider:8081/ - functions_provider_url=http://faasd-provider:8081/
@ -69,8 +65,6 @@ services:
- basic-auth-plugin - basic-auth-plugin
- nats - nats
- prometheus - prometheus
ports:
- "8080:8080"
queue-worker: queue-worker:
image: docker.io/openfaas/queue-worker:0.11.2 image: docker.io/openfaas/queue-worker:0.11.2

View File

@ -1,104 +0,0 @@
package pkg
import (
"io/ioutil"
"log"
"os"
"strings"
"sync"
"time"
)
// LocalResolver provides hostname to IP look-up for faasd core services
type LocalResolver struct {
Path string
Map map[string]string
Mutex *sync.RWMutex
}
// NewLocalResolver creates a new resolver for reading from a hosts file
func NewLocalResolver(path string) Resolver {
return &LocalResolver{
Path: path,
Mutex: &sync.RWMutex{},
Map: make(map[string]string),
}
}
// Start polling the disk for the hosts file in Path
func (l *LocalResolver) Start() {
var lastStat os.FileInfo
for {
rebuild := false
if info, err := os.Stat(l.Path); err == nil {
if lastStat == nil {
rebuild = true
} else {
if !lastStat.ModTime().Equal(info.ModTime()) {
rebuild = true
}
}
lastStat = info
}
if rebuild {
log.Printf("Resolver rebuilding map")
l.rebuild()
}
time.Sleep(time.Second * 3)
}
}
func (l *LocalResolver) rebuild() {
l.Mutex.Lock()
defer l.Mutex.Unlock()
fileData, fileErr := ioutil.ReadFile(l.Path)
if fileErr != nil {
log.Printf("resolver rebuild error: %s", fileErr.Error())
return
}
lines := strings.Split(string(fileData), "\n")
for _, line := range lines {
index := strings.Index(line, "\t")
if len(line) > 0 && index > -1 {
ip := line[:index]
host := line[index+1:]
log.Printf("Resolver: %q=%q", host, ip)
l.Map[host] = ip
}
}
}
// Get resolves a hostname to an IP, or timesout after the duration has passed
func (l *LocalResolver) Get(upstream string, got chan<- string, timeout time.Duration) {
start := time.Now()
for {
if val := l.get(upstream); len(val) > 0 {
got <- val
break
}
if time.Now().After(start.Add(timeout)) {
log.Printf("Timed out after %s getting host %q", timeout.String(), upstream)
break
}
time.Sleep(time.Millisecond * 250)
}
}
func (l *LocalResolver) get(upstream string) string {
l.Mutex.RLock()
defer l.Mutex.RUnlock()
if val, ok := l.Map[upstream]; ok {
return val
}
return ""
}

View File

@ -69,7 +69,6 @@ func deploy(ctx context.Context, req types.FunctionDeployment, client *container
if err != nil { if err != nil {
return err return err
} }
imgRef := reference.TagNameOnly(r).String() imgRef := reference.TagNameOnly(r).String()
snapshotter := "" snapshotter := ""
@ -99,11 +98,6 @@ func deploy(ctx context.Context, req types.FunctionDeployment, client *container
name := req.Service name := req.Service
labels := map[string]string{}
if req.Labels != nil {
labels = *req.Labels
}
container, err := client.NewContainer( container, err := client.NewContainer(
ctx, ctx,
name, name,
@ -114,7 +108,7 @@ func deploy(ctx context.Context, req types.FunctionDeployment, client *container
oci.WithCapabilities([]string{"CAP_NET_RAW"}), oci.WithCapabilities([]string{"CAP_NET_RAW"}),
oci.WithMounts(mounts), oci.WithMounts(mounts),
oci.WithEnv(envs)), oci.WithEnv(envs)),
containerd.WithContainerLabels(labels), containerd.WithContainerLabels(*req.Labels),
) )
if err != nil { if err != nil {
@ -128,6 +122,7 @@ func deploy(ctx context.Context, req types.FunctionDeployment, client *container
func createTask(ctx context.Context, client *containerd.Client, container containerd.Container, cni gocni.CNI) error { func createTask(ctx context.Context, client *containerd.Client, container containerd.Container, cni gocni.CNI) error {
name := container.ID() name := container.ID()
// task, taskErr := container.NewTask(ctx, cio.NewCreator(cio.WithStdio))
task, taskErr := container.NewTask(ctx, cio.BinaryIO("/usr/local/bin/faasd", nil)) task, taskErr := container.NewTask(ctx, cio.BinaryIO("/usr/local/bin/faasd", nil))

View File

@ -68,7 +68,6 @@ func GetFunction(client *containerd.Client, name string) (Function, error) {
if err != nil { if err != nil {
return Function{}, fmt.Errorf("unable to get task status for container: %s %s", name, err) return Function{}, fmt.Errorf("unable to get task status for container: %s %s", name, err)
} }
if svc.Status == "running" { if svc.Status == "running" {
replicas = 1 replicas = 1
f.pid = task.Pid() f.pid = task.Pid()
@ -86,7 +85,7 @@ func GetFunction(client *containerd.Client, name string) (Function, error) {
f.replicas = replicas f.replicas = replicas
return f, nil return f, nil
}
}
return Function{}, fmt.Errorf("unable to find function: %s, error %s", name, err) return Function{}, fmt.Errorf("unable to find function: %s, error %s", name, err)
} }

View File

@ -11,7 +11,6 @@ import (
"github.com/containerd/containerd" "github.com/containerd/containerd"
"github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/namespaces"
gocni "github.com/containerd/go-cni" gocni "github.com/containerd/go-cni"
"github.com/openfaas/faas-provider/types" "github.com/openfaas/faas-provider/types"
faasd "github.com/openfaas/faasd/pkg" faasd "github.com/openfaas/faasd/pkg"
) )
@ -59,71 +58,46 @@ func MakeReplicaUpdateHandler(client *containerd.Client, cni gocni.CNI) func(w h
return return
} }
var taskExists bool taskExists := true
var taskStatus *containerd.Status
task, taskErr := ctr.Task(ctx, nil) task, taskErr := ctr.Task(ctx, nil)
if taskErr != nil { if taskErr != nil {
msg := fmt.Sprintf("cannot load task for service %s, error: %s", name, taskErr) msg := fmt.Sprintf("cannot load task for service %s, error: %s", name, taskErr)
log.Printf("[Scale] %s\n", msg) log.Printf("[Scale] %s\n", msg)
taskExists = false taskExists = false
} else {
taskExists = true
status, statusErr := task.Status(ctx)
if statusErr != nil {
msg := fmt.Sprintf("cannot load task status for %s, error: %s", name, statusErr)
log.Printf("[Scale] %s\n", msg)
http.Error(w, msg, http.StatusInternalServerError)
return
} else {
taskStatus = &status
}
} }
createNewTask := false if req.Replicas > 0 {
if taskExists {
// Scale to zero if status, statusErr := task.Status(ctx); statusErr == nil {
if req.Replicas == 0 { if status.Status == containerd.Paused {
// If a task is running, pause it if resumeErr := task.Resume(ctx); resumeErr != nil {
if taskExists && taskStatus.Status == containerd.Running { log.Printf("[Scale] error resuming task %s, error: %s\n", name, resumeErr)
if pauseErr := task.Pause(ctx); pauseErr != nil { http.Error(w, resumeErr.Error(), http.StatusBadRequest)
wrappedPauseErr := fmt.Errorf("error pausing task %s, error: %s", name, pauseErr) }
log.Printf("[Scale] %s\n", wrappedPauseErr.Error()) }
http.Error(w, wrappedPauseErr.Error(), http.StatusNotFound) }
} else {
deployErr := createTask(ctx, client, ctr, cni)
if deployErr != nil {
log.Printf("[Scale] error deploying %s, error: %s\n", name, deployErr)
http.Error(w, deployErr.Error(), http.StatusBadRequest)
return return
} }
}
}
if taskExists {
if taskStatus != nil {
if taskStatus.Status == containerd.Paused {
if resumeErr := task.Resume(ctx); resumeErr != nil {
log.Printf("[Scale] error resuming task %s, error: %s\n", name, resumeErr)
http.Error(w, resumeErr.Error(), http.StatusBadRequest)
return
}
} else if taskStatus.Status == containerd.Stopped {
// Stopped tasks cannot be restarted, must be removed, and created again
if _, delErr := task.Delete(ctx); delErr != nil {
log.Printf("[Scale] error deleting stopped task %s, error: %s\n", name, delErr)
http.Error(w, delErr.Error(), http.StatusBadRequest)
return
}
createNewTask = true
}
}
} else {
createNewTask = true
}
if createNewTask {
deployErr := createTask(ctx, client, ctr, cni)
if deployErr != nil {
log.Printf("[Scale] error deploying %s, error: %s\n", name, deployErr)
http.Error(w, deployErr.Error(), http.StatusBadRequest)
return return
} }
} else {
if taskExists {
if status, statusErr := task.Status(ctx); statusErr == nil {
if status.Status == containerd.Running {
if pauseErr := task.Pause(ctx); pauseErr != nil {
log.Printf("[Scale] error pausing task %s, error: %s\n", name, pauseErr)
http.Error(w, pauseErr.Error(), http.StatusBadRequest)
}
}
}
}
} }
} }
} }

View File

@ -6,91 +6,74 @@ import (
"log" "log"
"net" "net"
"net/http" "net/http"
"strconv"
"strings"
"time" "time"
) )
// NewProxy creates a HTTP proxy to expose a host // NewProxy creates a HTTP proxy to expose the gateway container
func NewProxy(upstream string, listenPort uint32, hostIP string, timeout time.Duration, resolver Resolver) *Proxy { // from OpenFaaS to the host
func NewProxy(port int, timeout time.Duration) *Proxy {
return &Proxy{ return &Proxy{
Upstream: upstream, Port: port,
Port: listenPort, Timeout: timeout,
HostIP: hostIP,
Timeout: timeout,
Resolver: resolver,
} }
} }
// Proxy for exposing a private container // Proxy for exposing a private container
type Proxy struct { type Proxy struct {
Timeout time.Duration Timeout time.Duration
Port int
}
// Port on which to listen to traffic type proxyState struct {
Port uint32 Host string
// Upstream is where to send traffic when received
Upstream string
// The IP to use to bind locally
HostIP string
Resolver Resolver
} }
// Start listening and forwarding HTTP to the host // Start listening and forwarding HTTP to the host
func (p *Proxy) Start() error { func (p *Proxy) Start(gatewayChan chan string, done chan bool) error {
tcp := p.Port
http.DefaultClient.CheckRedirect = func(req *http.Request, via []*http.Request) error { http.DefaultClient.CheckRedirect = func(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse return http.ErrUseLastResponse
} }
upstreamHost, upstreamPort, err := getUpstream(p.Upstream, p.Port) ps := proxyState{
if err != nil { Host: "",
return err
} }
log.Printf("Looking up IP for: %q", upstreamHost) ps.Host = <-gatewayChan
got := make(chan string, 1)
go p.Resolver.Get(upstreamHost, got, time.Second*5) log.Printf("Starting faasd proxy on %d\n", tcp)
ipAddress := <-got fmt.Printf("Gateway: %s\n", ps.Host)
close(got)
upstreamAddr := fmt.Sprintf("%s:%d", ipAddress, upstreamPort) l, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", tcp))
localBind := fmt.Sprintf("%s:%d", p.HostIP, p.Port)
log.Printf("Proxy from: %s, to: %s (%s)\n", localBind, p.Upstream, ipAddress)
l, err := net.Listen("tcp", localBind)
if err != nil { if err != nil {
log.Printf("Error: %s", err.Error()) log.Printf("Error: %s", err.Error())
return err return err
} }
defer l.Close() defer l.Close()
for { for {
// Wait for a connection. // Wait for a connection.
conn, err := l.Accept() conn, err := l.Accept()
if err != nil { if err != nil {
acceptErr := fmt.Errorf("Unable to accept on %d, error: %s", acceptErr := fmt.Errorf("Unable to accept on %d, error: %s", tcp, err.Error())
p.Port,
err.Error())
log.Printf("%s", acceptErr.Error()) log.Printf("%s", acceptErr.Error())
return acceptErr return acceptErr
} }
upstream, err := net.Dial("tcp", upstreamAddr) upstream, err := net.Dial("tcp", fmt.Sprintf("%s", ps.Host))
if err != nil { if err != nil {
log.Printf("unable to dial to %s, error: %s", upstreamAddr, err.Error()) log.Printf("unable to dial to %s, error: %s", ps.Host, err.Error())
return err return err
} }
go pipe(conn, upstream) go pipe(conn, upstream)
go pipe(upstream, conn) go pipe(upstream, conn)
} }
} }
@ -98,19 +81,3 @@ func pipe(from net.Conn, to net.Conn) {
defer from.Close() defer from.Close()
io.Copy(from, to) io.Copy(from, to)
} }
func getUpstream(val string, defaultPort uint32) (string, uint32, error) {
upstreamHostname := val
upstreamPort := defaultPort
if in := strings.Index(val, ":"); in > -1 {
upstreamHostname = val[:in]
port, err := strconv.ParseInt(val[in+1:], 10, 32)
if err != nil {
return "", defaultPort, err
}
upstreamPort = uint32(port)
}
return upstreamHostname, upstreamPort, nil
}

View File

@ -16,7 +16,7 @@ func Test_Proxy_ToPrivateServer(t *testing.T) {
wantBodyText := "OK" wantBodyText := "OK"
wantBody := []byte(wantBodyText) wantBody := []byte(wantBodyText)
upstreamSvr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Body != nil { if r.Body != nil {
defer r.Body.Close() defer r.Body.Close()
@ -27,19 +27,17 @@ func Test_Proxy_ToPrivateServer(t *testing.T) {
})) }))
defer upstreamSvr.Close() defer upstream.Close()
port := 8080 port := 8080
u, _ := url.Parse(upstreamSvr.URL) proxy := NewProxy(port, time.Second*1)
log.Println("Host", u.Host)
upstreamAddr := u.Host
proxy := NewProxy(upstreamAddr, 8080, "127.0.0.1", time.Second*1, &mockResolver{})
gwChan := make(chan string, 1) gwChan := make(chan string, 1)
doneCh := make(chan bool) doneCh := make(chan bool)
go proxy.Start() go proxy.Start(gwChan, doneCh)
u, _ := url.Parse(upstream.URL)
log.Println("Host", u.Host)
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
wg.Add(1) wg.Add(1)
go func() { go func() {
@ -73,14 +71,3 @@ func Test_Proxy_ToPrivateServer(t *testing.T) {
doneCh <- true doneCh <- true
}() }()
} }
type mockResolver struct {
}
func (m *mockResolver) Start() {
}
func (m *mockResolver) Get(upstream string, got chan<- string, timeout time.Duration) {
got <- upstream
}

View File

@ -1,12 +0,0 @@
package pkg
import "time"
// Resolver resolves an upstream IP address for a given upstream host
type Resolver interface {
// Start any polling or connections required to resolve
Start()
// Get an IP address using an asynchronous operation
Get(upstream string, got chan<- string, timeout time.Duration)
}

View File

@ -41,13 +41,6 @@ type Service struct {
Caps []string Caps []string
Args []string Args []string
DependsOn []string DependsOn []string
Ports []ServicePort
}
type ServicePort struct {
TargetPort uint32
Port uint32
HostIP string
} }
type Mount struct { type Mount struct {
@ -161,7 +154,7 @@ func (s *Supervisor) Start(svcs []Service) error {
Options: []string{"rbind", "ro"}, Options: []string{"rbind", "ro"},
}) })
newContainer, err := s.client.NewContainer( newContainer, containerCreateErr := s.client.NewContainer(
ctx, ctx,
svc.Name, svc.Name,
containerd.WithImage(image), containerd.WithImage(image),
@ -173,14 +166,14 @@ func (s *Supervisor) Start(svcs []Service) error {
oci.WithEnv(svc.Env)), oci.WithEnv(svc.Env)),
) )
if err != nil { if containerCreateErr != nil {
log.Printf("Error creating container: %s\n", err) log.Printf("Error creating container: %s\n", containerCreateErr)
return err return containerCreateErr
} }
log.Printf("Created container: %s\n", newContainer.ID()) log.Printf("Created container: %s\n", newContainer.ID())
task, err := newContainer.NewTask(ctx, cio.BinaryIO("/usr/local/bin/faasd", nil)) task, err := newContainer.NewTask(ctx, cio.NewCreator(cio.WithStdio))
if err != nil { if err != nil {
log.Printf("Error creating task: %s\n", err) log.Printf("Error creating task: %s\n", err)
return err return err
@ -188,14 +181,13 @@ func (s *Supervisor) Start(svcs []Service) error {
labels := map[string]string{} labels := map[string]string{}
network, err := cninetwork.CreateCNINetwork(ctx, s.cni, task, labels) network, err := cninetwork.CreateCNINetwork(ctx, s.cni, task, labels)
if err != nil { if err != nil {
log.Printf("Error creating CNI for %s: %s", svc.Name, err)
return err return err
} }
ip, err := cninetwork.GetIPAddress(network, task) ip, err := cninetwork.GetIPAddress(network, task)
if err != nil { if err != nil {
log.Printf("Error getting IP for %s: %s", svc.Name, err)
return err return err
} }
@ -305,26 +297,12 @@ func ParseCompose(config *compose.Config) ([]Service, error) {
Env: env, Env: env,
Mounts: mounts, Mounts: mounts,
DependsOn: s.DependsOn, DependsOn: s.DependsOn,
Ports: convertPorts(s.Ports),
} }
} }
return services, nil return services, nil
} }
func convertPorts(ports []compose.ServicePortConfig) []ServicePort {
servicePorts := []ServicePort{}
for _, p := range ports {
servicePorts = append(servicePorts, ServicePort{
Port: p.Published,
TargetPort: p.Target,
HostIP: p.HostIP,
})
}
return servicePorts
}
// LoadComposeFile is a helper method for loading a docker-compose file // LoadComposeFile is a helper method for loading a docker-compose file
func LoadComposeFile(wd string, file string) (*compose.Config, error) { func LoadComposeFile(wd string, file string) (*compose.Config, error) {
return LoadComposeFileWithArch(wd, file, env.GetClientArch) return LoadComposeFileWithArch(wd, file, env.GetClientArch)

View File

@ -26,8 +26,6 @@ services:
- "8222" - "8222"
- "--store=memory" - "--store=memory"
- "--cluster_id=faas-cluster" - "--cluster_id=faas-cluster"
ports:
- "127.0.0.1:8222:8222"
prometheus: prometheus:
image: docker.io/prom/prometheus:v2.14.0 image: docker.io/prom/prometheus:v2.14.0
@ -37,8 +35,6 @@ services:
target: /etc/prometheus/prometheus.yml target: /etc/prometheus/prometheus.yml
cap_add: cap_add:
- CAP_NET_RAW - CAP_NET_RAW
ports:
- "127.0.0.1:9090:9090"
gateway: gateway:
image: "docker.io/openfaas/gateway:0.18.17${ARCH_SUFFIX}" image: "docker.io/openfaas/gateway:0.18.17${ARCH_SUFFIX}"
@ -69,8 +65,6 @@ services:
- basic-auth-plugin - basic-auth-plugin
- nats - nats
- prometheus - prometheus
ports:
- "8080:8080"
queue-worker: queue-worker:
image: docker.io/openfaas/queue-worker:0.11.2 image: docker.io/openfaas/queue-worker:0.11.2