From c314af4f981b8084141aafb91b44f812e80bd4f1 Mon Sep 17 00:00:00 2001 From: "Alex Ellis (OpenFaaS Ltd)" Date: Fri, 18 Sep 2020 11:38:28 +0100 Subject: [PATCH] Add local resolver for system containers System containers can now be proxied to the localhost or to all adapters using docker-compose. Tested with NATS and Prometheus to 127.0.0.1 in multipass and with the gateway to 0.0.0.0. Signed-off-by: Alex Ellis (OpenFaaS Ltd) --- cmd/up.go | 49 +++++++++---------- pkg/local_resolver.go | 106 ++++++++++++++++++++++++++++++++++++++++++ pkg/proxy.go | 77 +++++++++++++++++++++--------- 3 files changed, 186 insertions(+), 46 deletions(-) create mode 100644 pkg/local_resolver.go diff --git a/cmd/up.go b/cmd/up.go index ca39347..f4ebd87 100644 --- a/cmd/up.go +++ b/cmd/up.go @@ -7,7 +7,6 @@ import ( "os" "os/signal" "path" - "strings" "sync" "syscall" "time" @@ -85,7 +84,7 @@ func runUp(cmd *cobra.Command, _ []string) error { shutdownTimeout := time.Second * 1 timeout := time.Second * 60 - proxyDoneCh := make(chan bool) + // proxyDoneCh := make(chan bool) wg := sync.WaitGroup{} wg.Add(1) @@ -103,37 +102,39 @@ func runUp(cmd *cobra.Command, _ []string) error { } // Close proxy - proxyDoneCh <- true + // proxyDoneCh <- true time.AfterFunc(shutdownTimeout, func() { wg.Done() }) }() - gatewayURLChan := make(chan string, 1) - proxyPort := 8080 - proxy := pkg.NewProxy(proxyPort, timeout) - go proxy.Start(gatewayURLChan, proxyDoneCh) + localResolver := pkg.NewLocalResolver(path.Join(cfg.workingDir, "hosts")) + go localResolver.Start() - go func() { - time.Sleep(3 * time.Second) + proxies := map[uint32]*pkg.Proxy{} + for _, svc := range services { + for _, port := range svc.Ports { - fileData, fileErr := ioutil.ReadFile(path.Join(cfg.workingDir, "hosts")) - if fileErr != nil { - log.Println(fileErr) - return - } - - host := "" - lines := strings.Split(string(fileData), "\n") - for _, line := range lines { - if strings.Index(line, "gateway") > -1 { - host = line[:strings.Index(line, "\t")] + listenPort := port.Port + if _, ok := proxies[listenPort]; ok { + return fmt.Errorf("port %d already allocated", listenPort) } + + hostIP := "0.0.0.0" + if len(port.HostIP) > 0 { + hostIP = port.HostIP + } + + upstream := fmt.Sprintf("%s:%d", svc.Name, port.TargetPort) + proxies[listenPort] = pkg.NewProxy(upstream, listenPort, hostIP, timeout, localResolver) } - log.Printf("[up] Sending %s to proxy\n", host) - gatewayURLChan <- host + ":8080" - close(gatewayURLChan) - }() + } + + // wg.Add(len(proxies)) + + for _, v := range proxies { + go v.Start() + } wg.Wait() return nil diff --git a/pkg/local_resolver.go b/pkg/local_resolver.go new file mode 100644 index 0000000..df9a113 --- /dev/null +++ b/pkg/local_resolver.go @@ -0,0 +1,106 @@ +package pkg + +import ( + "io/ioutil" + "log" + "os" + "strings" + "sync" + "time" +) + +type Resolver interface { + Start() + Get(upstream string, got chan<- string, timeout time.Duration) +} + +type LocalResolver struct { + Path string + Map map[string]string + Mutex *sync.RWMutex +} + +func NewLocalResolver(path string) Resolver { + return &LocalResolver{ + Path: path, + Mutex: &sync.RWMutex{}, + Map: make(map[string]string), + } +} + +func (l *LocalResolver) Start() { + var lastStat os.FileInfo + + for { + rebuild := false + if info, err := os.Stat(l.Path); err == nil { + if lastStat == nil { + rebuild = true + } else { + if !lastStat.ModTime().Equal(info.ModTime()) { + rebuild = true + } + } + lastStat = info + } + + if rebuild { + log.Printf("Resolver rebuilding map") + l.rebuild() + } + time.Sleep(time.Second * 3) + } +} + +func (l *LocalResolver) rebuild() { + l.Mutex.Lock() + defer l.Mutex.Unlock() + + fileData, fileErr := ioutil.ReadFile(l.Path) + if fileErr != nil { + log.Printf("resolver rebuild error: %s", fileErr.Error()) + return + } + + lines := strings.Split(string(fileData), "\n") + + for _, line := range lines { + index := strings.Index(line, "\t") + + if len(line) > 0 && index > -1 { + ip := line[:index] + host := line[index+1:] + log.Printf("Resolver: %q=%q", host, ip) + l.Map[host] = ip + } + } +} + +// Get resolve an entry +func (l *LocalResolver) Get(upstream string, got chan<- string, timeout time.Duration) { + start := time.Now() + for { + if val := l.get(upstream); len(val) > 0 { + got <- val + break + } + + if time.Now().After(start.Add(timeout)) { + log.Printf("Timed out after %s getting host %q", timeout.String(), upstream) + break + } + + time.Sleep(time.Millisecond * 250) + } +} + +func (l *LocalResolver) get(upstream string) string { + l.Mutex.RLock() + defer l.Mutex.RUnlock() + + if val, ok := l.Map[upstream]; ok { + return val + } + + return "" +} diff --git a/pkg/proxy.go b/pkg/proxy.go index 3feaaef..622554a 100644 --- a/pkg/proxy.go +++ b/pkg/proxy.go @@ -6,74 +6,91 @@ import ( "log" "net" "net/http" + "strconv" + "strings" "time" ) -// NewProxy creates a HTTP proxy to expose the gateway container -// from OpenFaaS to the host -func NewProxy(port int, timeout time.Duration) *Proxy { +// NewProxy creates a HTTP proxy to expose a host +func NewProxy(upstream string, listenPort uint32, hostIP string, timeout time.Duration, resolver Resolver) *Proxy { return &Proxy{ - Port: port, - Timeout: timeout, + Upstream: upstream, + Port: listenPort, + HostIP: hostIP, + Timeout: timeout, + Resolver: resolver, } } // Proxy for exposing a private container type Proxy struct { Timeout time.Duration - Port int -} -type proxyState struct { - Host string + // Port on which to listen to traffic + Port uint32 + + // Upstream is where to send traffic when received + Upstream string + + // The IP to use to bind locally + HostIP string + + Resolver Resolver } // Start listening and forwarding HTTP to the host -func (p *Proxy) Start(gatewayChan chan string, done chan bool) error { - tcp := p.Port +func (p *Proxy) Start() error { http.DefaultClient.CheckRedirect = func(req *http.Request, via []*http.Request) error { return http.ErrUseLastResponse } - ps := proxyState{ - Host: "", + upstreamHost, upstreamPort, err := getUpstream(p.Upstream, p.Port) + if err != nil { + return err } - ps.Host = <-gatewayChan + log.Printf("Looking up IP for: %q", upstreamHost) + got := make(chan string, 1) - log.Printf("Starting faasd proxy on %d\n", tcp) + go p.Resolver.Get(upstreamHost, got, time.Second*5) - fmt.Printf("Gateway: %s\n", ps.Host) + ipAddress := <-got + close(got) - l, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", tcp)) + upstreamAddr := fmt.Sprintf("%s:%d", ipAddress, upstreamPort) + + localBind := fmt.Sprintf("%s:%d", p.HostIP, p.Port) + log.Printf("Proxy from: %s, to: %s (%s)\n", localBind, p.Upstream, ipAddress) + + l, err := net.Listen("tcp", localBind) if err != nil { log.Printf("Error: %s", err.Error()) return err } defer l.Close() - for { // Wait for a connection. conn, err := l.Accept() if err != nil { - acceptErr := fmt.Errorf("Unable to accept on %d, error: %s", tcp, err.Error()) + acceptErr := fmt.Errorf("Unable to accept on %d, error: %s", + p.Port, + err.Error()) log.Printf("%s", acceptErr.Error()) return acceptErr } - upstream, err := net.Dial("tcp", fmt.Sprintf("%s", ps.Host)) + upstream, err := net.Dial("tcp", upstreamAddr) if err != nil { - log.Printf("unable to dial to %s, error: %s", ps.Host, err.Error()) + log.Printf("unable to dial to %s, error: %s", upstreamAddr, err.Error()) return err } go pipe(conn, upstream) go pipe(upstream, conn) - } } @@ -81,3 +98,19 @@ func pipe(from net.Conn, to net.Conn) { defer from.Close() io.Copy(from, to) } + +func getUpstream(val string, defaultPort uint32) (string, uint32, error) { + upstreamHostname := val + upstreamPort := defaultPort + + if in := strings.Index(val, ":"); in > -1 { + upstreamHostname = val[:in] + port, err := strconv.ParseInt(val[in+1:], 10, 32) + if err != nil { + return "", defaultPort, err + } + upstreamPort = uint32(port) + } + + return upstreamHostname, upstreamPort, nil +}