Feature for probing functions

Introduces a single-flight call to a function's health
endpoint to verify that it is registered with an Istio
sidecar (Envoy) before letting the invocation through.

Results are cached for 5 seconds, before a probe is
required again.

Tested without Istio, with probe_functions environment
variable set to true, I saw a probe execute in the logs.

Fixes: #1721 for Istio users.

Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alex@openfaas.com>
This commit is contained in:
Alex Ellis (OpenFaaS Ltd)
2022-06-29 10:09:01 +01:00
committed by Alex Ellis
parent 01841f605c
commit 88eea5f62e
43 changed files with 784 additions and 468 deletions

View File

@ -32,7 +32,7 @@ type ExternalServiceQuery struct {
// NewExternalServiceQuery proxies service queries to external plugin via HTTP
func NewExternalServiceQuery(externalURL url.URL, authInjector middleware.AuthInjector) scaling.ServiceQuery {
timeout := 5 * time.Second
timeout := 3 * time.Second
proxyClient := http.Client{
Transport: &http.Transport{
@ -82,35 +82,28 @@ func (s ExternalServiceQuery) GetReplicas(serviceName, serviceNamespace string)
res, err := s.ProxyClient.Do(req)
if err != nil {
log.Printf("Unable to connect to %s, error: %s", urlPath, err)
log.Println(urlPath, err)
return emptyServiceQueryResponse, err
}
var bytesOut []byte
if res.Body != nil {
bytesOut, _ = ioutil.ReadAll(res.Body)
defer res.Body.Close()
}
if res.StatusCode == http.StatusOK {
if err := json.Unmarshal(bytesOut, &function); err != nil {
log.Printf("Unable to unmarshal: %q, %s", string(bytesOut), err)
return emptyServiceQueryResponse, err
}
// log.Printf("GetReplicas [%s.%s] took: %fs", serviceName, serviceNamespace, time.Since(start).Seconds())
} else {
var body []byte
if res.Body != nil {
defer res.Body.Close()
body, _ = ioutil.ReadAll(res.Body)
}
if res.StatusCode == http.StatusOK {
err = json.Unmarshal(body, &function)
if err != nil {
log.Printf("Unable to unmarshal %s, error: %s", string(body), err)
}
log.Printf("GetReplicas [%s.%s] took: %fs",
serviceName,
serviceNamespace,
time.Since(start).Seconds())
} else {
log.Printf("GetReplicas [%s.%s] took: %fs, code: %d",
serviceName,
serviceNamespace,
time.Since(start).Seconds(),
res.StatusCode)
return emptyServiceQueryResponse, fmt.Errorf("server returned non-200 status code (%d) for function, %s", res.StatusCode, serviceName)
}
log.Printf("GetReplicas [%s.%s] took: %fs, code: %d\n", serviceName, serviceNamespace, time.Since(start).Seconds(), res.StatusCode)
return emptyServiceQueryResponse, fmt.Errorf("server returned non-200 status code (%d) for function, %s, body: %s", res.StatusCode, serviceName, string(bytesOut))
}
minReplicas := uint64(scaling.DefaultMinReplicas)
@ -128,7 +121,7 @@ func (s ExternalServiceQuery) GetReplicas(serviceName, serviceNamespace string)
extractedScalingFactor := extractLabelValue(labels[scaling.ScalingFactorLabel], scalingFactor)
targetLoad = extractLabelValue(labels[scaling.TargetLoadLabel], targetLoad)
if extractedScalingFactor > 0 && extractedScalingFactor <= 100 {
if extractedScalingFactor >= 0 && extractedScalingFactor <= 100 {
scalingFactor = extractedScalingFactor
} else {
log.Printf("Bad Scaling Factor: %d, is not in range of [0 - 100]. Will fallback to %d", extractedScalingFactor, scalingFactor)