Use sync package from unofficial Go library

Uses the sync package from the unofficial Go library instead
of simpler solution.

Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alex@openfaas.com>
This commit is contained in:
Alex Ellis (OpenFaaS Ltd) 2022-06-23 10:53:20 +01:00 committed by Alex Ellis
parent 6ed0ab71fb
commit 01841f605c
10 changed files with 286 additions and 9 deletions

View File

@ -11,4 +11,5 @@ require (
github.com/prometheus/client_golang v1.11.1
github.com/prometheus/client_model v0.2.0
go.uber.org/goleak v1.1.10
golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f // indirect
)

View File

@ -204,6 +204,8 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f h1:Ax0t5p6N38Ga0dThY21weqDEyz2oklo4IvDkpigvkD8=
golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=

View File

@ -6,13 +6,15 @@ package scaling
import (
"fmt"
"log"
"golang.org/x/sync/singleflight"
)
type CachedFunctionQuery struct {
cache FunctionCacher
serviceQuery ServiceQuery
emptyAnnotations map[string]string
singleFlight *SingleFlight
singleFlight *singleflight.Group
}
func NewCachedFunctionQuery(cache FunctionCacher, serviceQuery ServiceQuery) FunctionQuery {
@ -20,7 +22,7 @@ func NewCachedFunctionQuery(cache FunctionCacher, serviceQuery ServiceQuery) Fun
cache: cache,
serviceQuery: serviceQuery,
emptyAnnotations: map[string]string{},
singleFlight: NewSingleFlight(),
singleFlight: &singleflight.Group{},
}
}
@ -41,7 +43,7 @@ func (c *CachedFunctionQuery) Get(fn string, ns string) (ServiceQueryResponse, e
query, hit := c.cache.Get(fn, ns)
if !hit {
key := fmt.Sprintf("GetReplicas-%s.%s", fn, ns)
queryResponse, err := c.singleFlight.Do(key, func() (interface{}, error) {
queryResponse, err, _ := c.singleFlight.Do(key, func() (interface{}, error) {
log.Printf("Cache miss - run GetReplicas")
// If there is a cache miss, then fetch the value from the provider API
return c.serviceQuery.GetReplicas(fn, ns)

View File

@ -4,6 +4,8 @@ import (
"fmt"
"log"
"time"
"golang.org/x/sync/singleflight"
)
// NewFunctionScaler create a new scaler with the specified
@ -12,7 +14,7 @@ func NewFunctionScaler(config ScalingConfig, functionCacher FunctionCacher) Func
return FunctionScaler{
Cache: functionCacher,
Config: config,
SingleFlight: NewSingleFlight(),
SingleFlight: &singleflight.Group{},
}
}
@ -20,7 +22,7 @@ func NewFunctionScaler(config ScalingConfig, functionCacher FunctionCacher) Func
type FunctionScaler struct {
Cache FunctionCacher
Config ScalingConfig
SingleFlight *SingleFlight
SingleFlight *singleflight.Group
}
// FunctionScaleResult holds the result of scaling from zero
@ -47,7 +49,7 @@ func (f *FunctionScaler) Scale(functionName, namespace string) FunctionScaleResu
}
getKey := fmt.Sprintf("GetReplicas-%s.%s", functionName, namespace)
res, err := f.SingleFlight.Do(getKey, func() (interface{}, error) {
res, err, _ := f.SingleFlight.Do(getKey, func() (interface{}, error) {
return f.Config.ServiceQuery.GetReplicas(functionName, namespace)
})
@ -80,7 +82,7 @@ func (f *FunctionScaler) Scale(functionName, namespace string) FunctionScaleResu
scaleResult := backoff(func(attempt int) error {
res, err := f.SingleFlight.Do(getKey, func() (interface{}, error) {
res, err, _ := f.SingleFlight.Do(getKey, func() (interface{}, error) {
return f.Config.ServiceQuery.GetReplicas(functionName, namespace)
})
@ -98,7 +100,7 @@ func (f *FunctionScaler) Scale(functionName, namespace string) FunctionScaleResu
setKey := fmt.Sprintf("SetReplicas-%s.%s", functionName, namespace)
if _, err := f.SingleFlight.Do(setKey, func() (interface{}, error) {
if _, err, _ := f.SingleFlight.Do(setKey, func() (interface{}, error) {
log.Printf("[Scale %d] function=%s 0 => %d requested", attempt, functionName, minReplicas)
@ -125,7 +127,7 @@ func (f *FunctionScaler) Scale(functionName, namespace string) FunctionScaleResu
for i := 0; i < int(f.Config.MaxPollCount); i++ {
res, err := f.SingleFlight.Do(getKey, func() (interface{}, error) {
res, err, _ := f.SingleFlight.Do(getKey, func() (interface{}, error) {
return f.Config.ServiceQuery.GetReplicas(functionName, namespace)
})
queryResponse := res.(ServiceQueryResponse)

3
gateway/vendor/golang.org/x/sync/AUTHORS generated vendored Normal file
View File

@ -0,0 +1,3 @@
# This source code refers to The Go Authors for copyright purposes.
# The master list of authors is in the main Go distribution,
# visible at http://tip.golang.org/AUTHORS.

3
gateway/vendor/golang.org/x/sync/CONTRIBUTORS generated vendored Normal file
View File

@ -0,0 +1,3 @@
# This source code was written by the Go contributors.
# The master list of contributors is in the main Go distribution,
# visible at http://tip.golang.org/CONTRIBUTORS.

27
gateway/vendor/golang.org/x/sync/LICENSE generated vendored Normal file
View File

@ -0,0 +1,27 @@
Copyright (c) 2009 The Go Authors. All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following disclaimer
in the documentation and/or other materials provided with the
distribution.
* Neither the name of Google Inc. nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

22
gateway/vendor/golang.org/x/sync/PATENTS generated vendored Normal file
View File

@ -0,0 +1,22 @@
Additional IP Rights Grant (Patents)
"This implementation" means the copyrightable works distributed by
Google as part of the Go project.
Google hereby grants to You a perpetual, worldwide, non-exclusive,
no-charge, royalty-free, irrevocable (except as stated in this section)
patent license to make, have made, use, offer to sell, sell, import,
transfer and otherwise run, modify and propagate the contents of this
implementation of Go, where such license applies only to those patent
claims, both currently owned or controlled by Google and acquired in
the future, licensable by Google that are necessarily infringed by this
implementation of Go. This grant does not include claims that would be
infringed only as a consequence of further modification of this
implementation. If you or your agent or exclusive licensee institute or
order or agree to the institution of patent litigation against any
entity (including a cross-claim or counterclaim in a lawsuit) alleging
that this implementation of Go or any code incorporated within this
implementation of Go constitutes direct or contributory patent
infringement, or inducement of patent infringement, then any patent
rights granted to you under this License for this implementation of Go
shall terminate as of the date such litigation is filed.

View File

@ -0,0 +1,212 @@
// Copyright 2013 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package singleflight provides a duplicate function call suppression
// mechanism.
package singleflight // import "golang.org/x/sync/singleflight"
import (
"bytes"
"errors"
"fmt"
"runtime"
"runtime/debug"
"sync"
)
// errGoexit indicates the runtime.Goexit was called in
// the user given function.
var errGoexit = errors.New("runtime.Goexit was called")
// A panicError is an arbitrary value recovered from a panic
// with the stack trace during the execution of given function.
type panicError struct {
value interface{}
stack []byte
}
// Error implements error interface.
func (p *panicError) Error() string {
return fmt.Sprintf("%v\n\n%s", p.value, p.stack)
}
func newPanicError(v interface{}) error {
stack := debug.Stack()
// The first line of the stack trace is of the form "goroutine N [status]:"
// but by the time the panic reaches Do the goroutine may no longer exist
// and its status will have changed. Trim out the misleading line.
if line := bytes.IndexByte(stack[:], '\n'); line >= 0 {
stack = stack[line+1:]
}
return &panicError{value: v, stack: stack}
}
// call is an in-flight or completed singleflight.Do call
type call struct {
wg sync.WaitGroup
// These fields are written once before the WaitGroup is done
// and are only read after the WaitGroup is done.
val interface{}
err error
// forgotten indicates whether Forget was called with this call's key
// while the call was still in flight.
forgotten bool
// These fields are read and written with the singleflight
// mutex held before the WaitGroup is done, and are read but
// not written after the WaitGroup is done.
dups int
chans []chan<- Result
}
// Group represents a class of work and forms a namespace in
// which units of work can be executed with duplicate suppression.
type Group struct {
mu sync.Mutex // protects m
m map[string]*call // lazily initialized
}
// Result holds the results of Do, so they can be passed
// on a channel.
type Result struct {
Val interface{}
Err error
Shared bool
}
// Do executes and returns the results of the given function, making
// sure that only one execution is in-flight for a given key at a
// time. If a duplicate comes in, the duplicate caller waits for the
// original to complete and receives the same results.
// The return value shared indicates whether v was given to multiple callers.
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
c.dups++
g.mu.Unlock()
c.wg.Wait()
if e, ok := c.err.(*panicError); ok {
panic(e)
} else if c.err == errGoexit {
runtime.Goexit()
}
return c.val, c.err, true
}
c := new(call)
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
g.doCall(c, key, fn)
return c.val, c.err, c.dups > 0
}
// DoChan is like Do but returns a channel that will receive the
// results when they are ready.
//
// The returned channel will not be closed.
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
ch := make(chan Result, 1)
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
c.dups++
c.chans = append(c.chans, ch)
g.mu.Unlock()
return ch
}
c := &call{chans: []chan<- Result{ch}}
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
go g.doCall(c, key, fn)
return ch
}
// doCall handles the single call for a key.
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
normalReturn := false
recovered := false
// use double-defer to distinguish panic from runtime.Goexit,
// more details see https://golang.org/cl/134395
defer func() {
// the given function invoked runtime.Goexit
if !normalReturn && !recovered {
c.err = errGoexit
}
c.wg.Done()
g.mu.Lock()
defer g.mu.Unlock()
if !c.forgotten {
delete(g.m, key)
}
if e, ok := c.err.(*panicError); ok {
// In order to prevent the waiting channels from being blocked forever,
// needs to ensure that this panic cannot be recovered.
if len(c.chans) > 0 {
go panic(e)
select {} // Keep this goroutine around so that it will appear in the crash dump.
} else {
panic(e)
}
} else if c.err == errGoexit {
// Already in the process of goexit, no need to call again
} else {
// Normal return
for _, ch := range c.chans {
ch <- Result{c.val, c.err, c.dups > 0}
}
}
}()
func() {
defer func() {
if !normalReturn {
// Ideally, we would wait to take a stack trace until we've determined
// whether this is a panic or a runtime.Goexit.
//
// Unfortunately, the only way we can distinguish the two is to see
// whether the recover stopped the goroutine from terminating, and by
// the time we know that, the part of the stack trace relevant to the
// panic has been discarded.
if r := recover(); r != nil {
c.err = newPanicError(r)
}
}
}()
c.val, c.err = fn()
normalReturn = true
}()
if !normalReturn {
recovered = true
}
}
// Forget tells the singleflight to forget about a key. Future calls
// to Do for this key will call the function rather than waiting for
// an earlier call to complete.
func (g *Group) Forget(key string) {
g.mu.Lock()
if c, ok := g.m[key]; ok {
c.forgotten = true
}
delete(g.m, key)
g.mu.Unlock()
}

View File

@ -67,6 +67,9 @@ golang.org/x/crypto/ed25519/internal/edwards25519
# golang.org/x/lint v0.0.0-20190930215403-16217165b5de
golang.org/x/lint
golang.org/x/lint/golint
# golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f
## explicit
golang.org/x/sync/singleflight
# golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1
golang.org/x/sys/internal/unsafeheader
golang.org/x/sys/unix