Add local resolver for system containers

System containers can now be proxied to the localhost or to
all adapters using docker-compose.

Tested with NATS and Prometheus to 127.0.0.1 in multipass
and with the gateway to 0.0.0.0.

Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alexellis2@gmail.com>
This commit is contained in:
Alex Ellis (OpenFaaS Ltd)
2020-09-18 11:38:28 +01:00
committed by Alex Ellis
parent 4189cfe52c
commit c314af4f98
3 changed files with 186 additions and 46 deletions

View File

@ -7,7 +7,6 @@ import (
"os"
"os/signal"
"path"
"strings"
"sync"
"syscall"
"time"
@ -85,7 +84,7 @@ func runUp(cmd *cobra.Command, _ []string) error {
shutdownTimeout := time.Second * 1
timeout := time.Second * 60
proxyDoneCh := make(chan bool)
// proxyDoneCh := make(chan bool)
wg := sync.WaitGroup{}
wg.Add(1)
@ -103,37 +102,39 @@ func runUp(cmd *cobra.Command, _ []string) error {
}
// Close proxy
proxyDoneCh <- true
// proxyDoneCh <- true
time.AfterFunc(shutdownTimeout, func() {
wg.Done()
})
}()
gatewayURLChan := make(chan string, 1)
proxyPort := 8080
proxy := pkg.NewProxy(proxyPort, timeout)
go proxy.Start(gatewayURLChan, proxyDoneCh)
localResolver := pkg.NewLocalResolver(path.Join(cfg.workingDir, "hosts"))
go localResolver.Start()
go func() {
time.Sleep(3 * time.Second)
proxies := map[uint32]*pkg.Proxy{}
for _, svc := range services {
for _, port := range svc.Ports {
fileData, fileErr := ioutil.ReadFile(path.Join(cfg.workingDir, "hosts"))
if fileErr != nil {
log.Println(fileErr)
return
listenPort := port.Port
if _, ok := proxies[listenPort]; ok {
return fmt.Errorf("port %d already allocated", listenPort)
}
host := ""
lines := strings.Split(string(fileData), "\n")
for _, line := range lines {
if strings.Index(line, "gateway") > -1 {
host = line[:strings.Index(line, "\t")]
hostIP := "0.0.0.0"
if len(port.HostIP) > 0 {
hostIP = port.HostIP
}
upstream := fmt.Sprintf("%s:%d", svc.Name, port.TargetPort)
proxies[listenPort] = pkg.NewProxy(upstream, listenPort, hostIP, timeout, localResolver)
}
}
log.Printf("[up] Sending %s to proxy\n", host)
gatewayURLChan <- host + ":8080"
close(gatewayURLChan)
}()
// wg.Add(len(proxies))
for _, v := range proxies {
go v.Start()
}
wg.Wait()
return nil

106
pkg/local_resolver.go Normal file
View File

@ -0,0 +1,106 @@
package pkg
import (
"io/ioutil"
"log"
"os"
"strings"
"sync"
"time"
)
type Resolver interface {
Start()
Get(upstream string, got chan<- string, timeout time.Duration)
}
type LocalResolver struct {
Path string
Map map[string]string
Mutex *sync.RWMutex
}
func NewLocalResolver(path string) Resolver {
return &LocalResolver{
Path: path,
Mutex: &sync.RWMutex{},
Map: make(map[string]string),
}
}
func (l *LocalResolver) Start() {
var lastStat os.FileInfo
for {
rebuild := false
if info, err := os.Stat(l.Path); err == nil {
if lastStat == nil {
rebuild = true
} else {
if !lastStat.ModTime().Equal(info.ModTime()) {
rebuild = true
}
}
lastStat = info
}
if rebuild {
log.Printf("Resolver rebuilding map")
l.rebuild()
}
time.Sleep(time.Second * 3)
}
}
func (l *LocalResolver) rebuild() {
l.Mutex.Lock()
defer l.Mutex.Unlock()
fileData, fileErr := ioutil.ReadFile(l.Path)
if fileErr != nil {
log.Printf("resolver rebuild error: %s", fileErr.Error())
return
}
lines := strings.Split(string(fileData), "\n")
for _, line := range lines {
index := strings.Index(line, "\t")
if len(line) > 0 && index > -1 {
ip := line[:index]
host := line[index+1:]
log.Printf("Resolver: %q=%q", host, ip)
l.Map[host] = ip
}
}
}
// Get resolve an entry
func (l *LocalResolver) Get(upstream string, got chan<- string, timeout time.Duration) {
start := time.Now()
for {
if val := l.get(upstream); len(val) > 0 {
got <- val
break
}
if time.Now().After(start.Add(timeout)) {
log.Printf("Timed out after %s getting host %q", timeout.String(), upstream)
break
}
time.Sleep(time.Millisecond * 250)
}
}
func (l *LocalResolver) get(upstream string) string {
l.Mutex.RLock()
defer l.Mutex.RUnlock()
if val, ok := l.Map[upstream]; ok {
return val
}
return ""
}

View File

@ -6,74 +6,91 @@ import (
"log"
"net"
"net/http"
"strconv"
"strings"
"time"
)
// NewProxy creates a HTTP proxy to expose the gateway container
// from OpenFaaS to the host
func NewProxy(port int, timeout time.Duration) *Proxy {
// NewProxy creates a HTTP proxy to expose a host
func NewProxy(upstream string, listenPort uint32, hostIP string, timeout time.Duration, resolver Resolver) *Proxy {
return &Proxy{
Port: port,
Upstream: upstream,
Port: listenPort,
HostIP: hostIP,
Timeout: timeout,
Resolver: resolver,
}
}
// Proxy for exposing a private container
type Proxy struct {
Timeout time.Duration
Port int
}
type proxyState struct {
Host string
// Port on which to listen to traffic
Port uint32
// Upstream is where to send traffic when received
Upstream string
// The IP to use to bind locally
HostIP string
Resolver Resolver
}
// Start listening and forwarding HTTP to the host
func (p *Proxy) Start(gatewayChan chan string, done chan bool) error {
tcp := p.Port
func (p *Proxy) Start() error {
http.DefaultClient.CheckRedirect = func(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse
}
ps := proxyState{
Host: "",
upstreamHost, upstreamPort, err := getUpstream(p.Upstream, p.Port)
if err != nil {
return err
}
ps.Host = <-gatewayChan
log.Printf("Looking up IP for: %q", upstreamHost)
got := make(chan string, 1)
log.Printf("Starting faasd proxy on %d\n", tcp)
go p.Resolver.Get(upstreamHost, got, time.Second*5)
fmt.Printf("Gateway: %s\n", ps.Host)
ipAddress := <-got
close(got)
l, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", tcp))
upstreamAddr := fmt.Sprintf("%s:%d", ipAddress, upstreamPort)
localBind := fmt.Sprintf("%s:%d", p.HostIP, p.Port)
log.Printf("Proxy from: %s, to: %s (%s)\n", localBind, p.Upstream, ipAddress)
l, err := net.Listen("tcp", localBind)
if err != nil {
log.Printf("Error: %s", err.Error())
return err
}
defer l.Close()
for {
// Wait for a connection.
conn, err := l.Accept()
if err != nil {
acceptErr := fmt.Errorf("Unable to accept on %d, error: %s", tcp, err.Error())
acceptErr := fmt.Errorf("Unable to accept on %d, error: %s",
p.Port,
err.Error())
log.Printf("%s", acceptErr.Error())
return acceptErr
}
upstream, err := net.Dial("tcp", fmt.Sprintf("%s", ps.Host))
upstream, err := net.Dial("tcp", upstreamAddr)
if err != nil {
log.Printf("unable to dial to %s, error: %s", ps.Host, err.Error())
log.Printf("unable to dial to %s, error: %s", upstreamAddr, err.Error())
return err
}
go pipe(conn, upstream)
go pipe(upstream, conn)
}
}
@ -81,3 +98,19 @@ func pipe(from net.Conn, to net.Conn) {
defer from.Close()
io.Copy(from, to)
}
func getUpstream(val string, defaultPort uint32) (string, uint32, error) {
upstreamHostname := val
upstreamPort := defaultPort
if in := strings.Index(val, ":"); in > -1 {
upstreamHostname = val[:in]
port, err := strconv.ParseInt(val[in+1:], 10, 32)
if err != nil {
return "", defaultPort, err
}
upstreamPort = uint32(port)
}
return upstreamHostname, upstreamPort, nil
}