From 01841f605c79a3ea2dbde423db22f037bc4f5ef0 Mon Sep 17 00:00:00 2001 From: "Alex Ellis (OpenFaaS Ltd)" Date: Thu, 23 Jun 2022 10:53:20 +0100 Subject: [PATCH] Use sync package from unofficial Go library Uses the sync package from the unofficial Go library instead of simpler solution. Signed-off-by: Alex Ellis (OpenFaaS Ltd) --- gateway/go.mod | 1 + gateway/go.sum | 2 + gateway/scaling/function_query.go | 8 +- gateway/scaling/function_scaler.go | 14 +- gateway/vendor/golang.org/x/sync/AUTHORS | 3 + gateway/vendor/golang.org/x/sync/CONTRIBUTORS | 3 + gateway/vendor/golang.org/x/sync/LICENSE | 27 +++ gateway/vendor/golang.org/x/sync/PATENTS | 22 ++ .../x/sync/singleflight/singleflight.go | 212 ++++++++++++++++++ gateway/vendor/modules.txt | 3 + 10 files changed, 286 insertions(+), 9 deletions(-) create mode 100644 gateway/vendor/golang.org/x/sync/AUTHORS create mode 100644 gateway/vendor/golang.org/x/sync/CONTRIBUTORS create mode 100644 gateway/vendor/golang.org/x/sync/LICENSE create mode 100644 gateway/vendor/golang.org/x/sync/PATENTS create mode 100644 gateway/vendor/golang.org/x/sync/singleflight/singleflight.go diff --git a/gateway/go.mod b/gateway/go.mod index 7cba514a..7ba281b8 100644 --- a/gateway/go.mod +++ b/gateway/go.mod @@ -11,4 +11,5 @@ require ( github.com/prometheus/client_golang v1.11.1 github.com/prometheus/client_model v0.2.0 go.uber.org/goleak v1.1.10 + golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f // indirect ) diff --git a/gateway/go.sum b/gateway/go.sum index b201d69d..9ea979d6 100644 --- a/gateway/go.sum +++ b/gateway/go.sum @@ -204,6 +204,8 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f h1:Ax0t5p6N38Ga0dThY21weqDEyz2oklo4IvDkpigvkD8= +golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/gateway/scaling/function_query.go b/gateway/scaling/function_query.go index 7f40b448..ffb3b3c4 100644 --- a/gateway/scaling/function_query.go +++ b/gateway/scaling/function_query.go @@ -6,13 +6,15 @@ package scaling import ( "fmt" "log" + + "golang.org/x/sync/singleflight" ) type CachedFunctionQuery struct { cache FunctionCacher serviceQuery ServiceQuery emptyAnnotations map[string]string - singleFlight *SingleFlight + singleFlight *singleflight.Group } func NewCachedFunctionQuery(cache FunctionCacher, serviceQuery ServiceQuery) FunctionQuery { @@ -20,7 +22,7 @@ func NewCachedFunctionQuery(cache FunctionCacher, serviceQuery ServiceQuery) Fun cache: cache, serviceQuery: serviceQuery, emptyAnnotations: map[string]string{}, - singleFlight: NewSingleFlight(), + singleFlight: &singleflight.Group{}, } } @@ -41,7 +43,7 @@ func (c *CachedFunctionQuery) Get(fn string, ns string) (ServiceQueryResponse, e query, hit := c.cache.Get(fn, ns) if !hit { key := fmt.Sprintf("GetReplicas-%s.%s", fn, ns) - queryResponse, err := c.singleFlight.Do(key, func() (interface{}, error) { + queryResponse, err, _ := c.singleFlight.Do(key, func() (interface{}, error) { log.Printf("Cache miss - run GetReplicas") // If there is a cache miss, then fetch the value from the provider API return c.serviceQuery.GetReplicas(fn, ns) diff --git a/gateway/scaling/function_scaler.go b/gateway/scaling/function_scaler.go index 8a3fb824..f59a5a1f 100644 --- a/gateway/scaling/function_scaler.go +++ b/gateway/scaling/function_scaler.go @@ -4,6 +4,8 @@ import ( "fmt" "log" "time" + + "golang.org/x/sync/singleflight" ) // NewFunctionScaler create a new scaler with the specified @@ -12,7 +14,7 @@ func NewFunctionScaler(config ScalingConfig, functionCacher FunctionCacher) Func return FunctionScaler{ Cache: functionCacher, Config: config, - SingleFlight: NewSingleFlight(), + SingleFlight: &singleflight.Group{}, } } @@ -20,7 +22,7 @@ func NewFunctionScaler(config ScalingConfig, functionCacher FunctionCacher) Func type FunctionScaler struct { Cache FunctionCacher Config ScalingConfig - SingleFlight *SingleFlight + SingleFlight *singleflight.Group } // FunctionScaleResult holds the result of scaling from zero @@ -47,7 +49,7 @@ func (f *FunctionScaler) Scale(functionName, namespace string) FunctionScaleResu } getKey := fmt.Sprintf("GetReplicas-%s.%s", functionName, namespace) - res, err := f.SingleFlight.Do(getKey, func() (interface{}, error) { + res, err, _ := f.SingleFlight.Do(getKey, func() (interface{}, error) { return f.Config.ServiceQuery.GetReplicas(functionName, namespace) }) @@ -80,7 +82,7 @@ func (f *FunctionScaler) Scale(functionName, namespace string) FunctionScaleResu scaleResult := backoff(func(attempt int) error { - res, err := f.SingleFlight.Do(getKey, func() (interface{}, error) { + res, err, _ := f.SingleFlight.Do(getKey, func() (interface{}, error) { return f.Config.ServiceQuery.GetReplicas(functionName, namespace) }) @@ -98,7 +100,7 @@ func (f *FunctionScaler) Scale(functionName, namespace string) FunctionScaleResu setKey := fmt.Sprintf("SetReplicas-%s.%s", functionName, namespace) - if _, err := f.SingleFlight.Do(setKey, func() (interface{}, error) { + if _, err, _ := f.SingleFlight.Do(setKey, func() (interface{}, error) { log.Printf("[Scale %d] function=%s 0 => %d requested", attempt, functionName, minReplicas) @@ -125,7 +127,7 @@ func (f *FunctionScaler) Scale(functionName, namespace string) FunctionScaleResu for i := 0; i < int(f.Config.MaxPollCount); i++ { - res, err := f.SingleFlight.Do(getKey, func() (interface{}, error) { + res, err, _ := f.SingleFlight.Do(getKey, func() (interface{}, error) { return f.Config.ServiceQuery.GetReplicas(functionName, namespace) }) queryResponse := res.(ServiceQueryResponse) diff --git a/gateway/vendor/golang.org/x/sync/AUTHORS b/gateway/vendor/golang.org/x/sync/AUTHORS new file mode 100644 index 00000000..15167cd7 --- /dev/null +++ b/gateway/vendor/golang.org/x/sync/AUTHORS @@ -0,0 +1,3 @@ +# This source code refers to The Go Authors for copyright purposes. +# The master list of authors is in the main Go distribution, +# visible at http://tip.golang.org/AUTHORS. diff --git a/gateway/vendor/golang.org/x/sync/CONTRIBUTORS b/gateway/vendor/golang.org/x/sync/CONTRIBUTORS new file mode 100644 index 00000000..1c4577e9 --- /dev/null +++ b/gateway/vendor/golang.org/x/sync/CONTRIBUTORS @@ -0,0 +1,3 @@ +# This source code was written by the Go contributors. +# The master list of contributors is in the main Go distribution, +# visible at http://tip.golang.org/CONTRIBUTORS. diff --git a/gateway/vendor/golang.org/x/sync/LICENSE b/gateway/vendor/golang.org/x/sync/LICENSE new file mode 100644 index 00000000..6a66aea5 --- /dev/null +++ b/gateway/vendor/golang.org/x/sync/LICENSE @@ -0,0 +1,27 @@ +Copyright (c) 2009 The Go Authors. All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above +copyright notice, this list of conditions and the following disclaimer +in the documentation and/or other materials provided with the +distribution. + * Neither the name of Google Inc. nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/gateway/vendor/golang.org/x/sync/PATENTS b/gateway/vendor/golang.org/x/sync/PATENTS new file mode 100644 index 00000000..73309904 --- /dev/null +++ b/gateway/vendor/golang.org/x/sync/PATENTS @@ -0,0 +1,22 @@ +Additional IP Rights Grant (Patents) + +"This implementation" means the copyrightable works distributed by +Google as part of the Go project. + +Google hereby grants to You a perpetual, worldwide, non-exclusive, +no-charge, royalty-free, irrevocable (except as stated in this section) +patent license to make, have made, use, offer to sell, sell, import, +transfer and otherwise run, modify and propagate the contents of this +implementation of Go, where such license applies only to those patent +claims, both currently owned or controlled by Google and acquired in +the future, licensable by Google that are necessarily infringed by this +implementation of Go. This grant does not include claims that would be +infringed only as a consequence of further modification of this +implementation. If you or your agent or exclusive licensee institute or +order or agree to the institution of patent litigation against any +entity (including a cross-claim or counterclaim in a lawsuit) alleging +that this implementation of Go or any code incorporated within this +implementation of Go constitutes direct or contributory patent +infringement, or inducement of patent infringement, then any patent +rights granted to you under this License for this implementation of Go +shall terminate as of the date such litigation is filed. diff --git a/gateway/vendor/golang.org/x/sync/singleflight/singleflight.go b/gateway/vendor/golang.org/x/sync/singleflight/singleflight.go new file mode 100644 index 00000000..690eb850 --- /dev/null +++ b/gateway/vendor/golang.org/x/sync/singleflight/singleflight.go @@ -0,0 +1,212 @@ +// Copyright 2013 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package singleflight provides a duplicate function call suppression +// mechanism. +package singleflight // import "golang.org/x/sync/singleflight" + +import ( + "bytes" + "errors" + "fmt" + "runtime" + "runtime/debug" + "sync" +) + +// errGoexit indicates the runtime.Goexit was called in +// the user given function. +var errGoexit = errors.New("runtime.Goexit was called") + +// A panicError is an arbitrary value recovered from a panic +// with the stack trace during the execution of given function. +type panicError struct { + value interface{} + stack []byte +} + +// Error implements error interface. +func (p *panicError) Error() string { + return fmt.Sprintf("%v\n\n%s", p.value, p.stack) +} + +func newPanicError(v interface{}) error { + stack := debug.Stack() + + // The first line of the stack trace is of the form "goroutine N [status]:" + // but by the time the panic reaches Do the goroutine may no longer exist + // and its status will have changed. Trim out the misleading line. + if line := bytes.IndexByte(stack[:], '\n'); line >= 0 { + stack = stack[line+1:] + } + return &panicError{value: v, stack: stack} +} + +// call is an in-flight or completed singleflight.Do call +type call struct { + wg sync.WaitGroup + + // These fields are written once before the WaitGroup is done + // and are only read after the WaitGroup is done. + val interface{} + err error + + // forgotten indicates whether Forget was called with this call's key + // while the call was still in flight. + forgotten bool + + // These fields are read and written with the singleflight + // mutex held before the WaitGroup is done, and are read but + // not written after the WaitGroup is done. + dups int + chans []chan<- Result +} + +// Group represents a class of work and forms a namespace in +// which units of work can be executed with duplicate suppression. +type Group struct { + mu sync.Mutex // protects m + m map[string]*call // lazily initialized +} + +// Result holds the results of Do, so they can be passed +// on a channel. +type Result struct { + Val interface{} + Err error + Shared bool +} + +// Do executes and returns the results of the given function, making +// sure that only one execution is in-flight for a given key at a +// time. If a duplicate comes in, the duplicate caller waits for the +// original to complete and receives the same results. +// The return value shared indicates whether v was given to multiple callers. +func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) { + g.mu.Lock() + if g.m == nil { + g.m = make(map[string]*call) + } + if c, ok := g.m[key]; ok { + c.dups++ + g.mu.Unlock() + c.wg.Wait() + + if e, ok := c.err.(*panicError); ok { + panic(e) + } else if c.err == errGoexit { + runtime.Goexit() + } + return c.val, c.err, true + } + c := new(call) + c.wg.Add(1) + g.m[key] = c + g.mu.Unlock() + + g.doCall(c, key, fn) + return c.val, c.err, c.dups > 0 +} + +// DoChan is like Do but returns a channel that will receive the +// results when they are ready. +// +// The returned channel will not be closed. +func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result { + ch := make(chan Result, 1) + g.mu.Lock() + if g.m == nil { + g.m = make(map[string]*call) + } + if c, ok := g.m[key]; ok { + c.dups++ + c.chans = append(c.chans, ch) + g.mu.Unlock() + return ch + } + c := &call{chans: []chan<- Result{ch}} + c.wg.Add(1) + g.m[key] = c + g.mu.Unlock() + + go g.doCall(c, key, fn) + + return ch +} + +// doCall handles the single call for a key. +func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) { + normalReturn := false + recovered := false + + // use double-defer to distinguish panic from runtime.Goexit, + // more details see https://golang.org/cl/134395 + defer func() { + // the given function invoked runtime.Goexit + if !normalReturn && !recovered { + c.err = errGoexit + } + + c.wg.Done() + g.mu.Lock() + defer g.mu.Unlock() + if !c.forgotten { + delete(g.m, key) + } + + if e, ok := c.err.(*panicError); ok { + // In order to prevent the waiting channels from being blocked forever, + // needs to ensure that this panic cannot be recovered. + if len(c.chans) > 0 { + go panic(e) + select {} // Keep this goroutine around so that it will appear in the crash dump. + } else { + panic(e) + } + } else if c.err == errGoexit { + // Already in the process of goexit, no need to call again + } else { + // Normal return + for _, ch := range c.chans { + ch <- Result{c.val, c.err, c.dups > 0} + } + } + }() + + func() { + defer func() { + if !normalReturn { + // Ideally, we would wait to take a stack trace until we've determined + // whether this is a panic or a runtime.Goexit. + // + // Unfortunately, the only way we can distinguish the two is to see + // whether the recover stopped the goroutine from terminating, and by + // the time we know that, the part of the stack trace relevant to the + // panic has been discarded. + if r := recover(); r != nil { + c.err = newPanicError(r) + } + } + }() + + c.val, c.err = fn() + normalReturn = true + }() + + if !normalReturn { + recovered = true + } +} + +// Forget tells the singleflight to forget about a key. Future calls +// to Do for this key will call the function rather than waiting for +// an earlier call to complete. +func (g *Group) Forget(key string) { + g.mu.Lock() + if c, ok := g.m[key]; ok { + c.forgotten = true + } + delete(g.m, key) + g.mu.Unlock() +} diff --git a/gateway/vendor/modules.txt b/gateway/vendor/modules.txt index 649419a4..7140f0b0 100644 --- a/gateway/vendor/modules.txt +++ b/gateway/vendor/modules.txt @@ -67,6 +67,9 @@ golang.org/x/crypto/ed25519/internal/edwards25519 # golang.org/x/lint v0.0.0-20190930215403-16217165b5de golang.org/x/lint golang.org/x/lint/golint +# golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f +## explicit +golang.org/x/sync/singleflight # golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1 golang.org/x/sys/internal/unsafeheader golang.org/x/sys/unix