External replica proxy

This commit is contained in:
Alex Ellis 2017-07-27 19:37:55 +01:00
parent 33381a783a
commit f165ce2ca7
5 changed files with 92 additions and 9 deletions

View File

@ -9,6 +9,7 @@ COPY requests requests
COPY tests tests
COPY server.go .
COPY types types
COPY plugin plugin
RUN CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -o gateway .

View File

@ -115,16 +115,16 @@ func MakeAlertHandler(sq ServiceQuery) http.HandlerFunc {
errors := handleAlerts(&req, sq)
if len(errors) > 0 {
log.Println(errors)
w.WriteHeader(http.StatusInternalServerError)
var errorOutput string
for d, err := range errors {
errorOutput += fmt.Sprintf("[%d] %s\n", d, err)
}
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(errorOutput))
} else {
w.WriteHeader(http.StatusOK)
return
}
w.WriteHeader(http.StatusOK)
}
}

View File

@ -1,31 +1,114 @@
package plugin
import (
"bytes"
"encoding/json"
"log"
"net"
"net/http"
"net/url"
"time"
"fmt"
"io/ioutil"
"github.com/alexellis/faas/gateway/handlers"
"github.com/alexellis/faas/gateway/requests"
)
// NewExternalServiceQuery proxies service queries to external plugin via HTTP
func NewExternalServiceQuery(externalURL url.URL) handlers.ServiceQuery {
proxyClient := http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 3 * time.Second,
KeepAlive: 0,
}).DialContext,
MaxIdleConns: 1,
DisableKeepAlives: true,
IdleConnTimeout: 120 * time.Millisecond,
ExpectContinueTimeout: 1500 * time.Millisecond,
},
}
return ExternalServiceQuery{
URL: externalURL,
ProxyClient: proxyClient,
}
}
// ExternalServiceQuery proxies service queries to external plugin via HTTP
type ExternalServiceQuery struct {
URL url.URL
ProxyClient http.Client
}
const maxReplicas = 40
// GetReplicas replica count for function
func (s ExternalServiceQuery) GetReplicas(serviceName string) (uint64, uint64, error) {
var err error
function := requests.Function{}
return 0, 0, err
urlPath := fmt.Sprintf("%ssystem/function/%s", s.URL.String(), serviceName)
req, _ := http.NewRequest("GET", urlPath, nil)
res, err := s.ProxyClient.Do(req)
if err != nil {
log.Println(urlPath, err)
}
if res.StatusCode == 200 {
if res.Body != nil {
defer res.Body.Close()
bytesOut, _ := ioutil.ReadAll(res.Body)
err = json.Unmarshal(bytesOut, &function)
if err != nil {
log.Println(urlPath, err)
}
}
}
max := uint64(maxReplicas)
return function.Replicas, max, err
}
// ScaleServiceRequest request scaling of replica
type ScaleServiceRequest struct {
ServiceName string `json:"serviceName"`
Replicas uint64 `json:"replicas"`
}
// SetReplicas update the replica count
func (s ExternalServiceQuery) SetReplicas(serviceName string, count uint64) error {
var err error
scaleReq := ScaleServiceRequest{
ServiceName: serviceName,
Replicas: count,
}
requestBody, err := json.Marshal(scaleReq)
if err != nil {
return err
}
urlPath := fmt.Sprintf("%ssystem/scale-function/%s", s.URL.String(), serviceName)
req, _ := http.NewRequest("POST", urlPath, bytes.NewReader(requestBody))
defer req.Body.Close()
res, err := s.ProxyClient.Do(req)
defer res.Body.Close()
if err != nil {
log.Println(urlPath, err)
}
if res.StatusCode != http.StatusOK {
err = fmt.Errorf("error scaling HTTP code %d, %s", res.StatusCode, urlPath)
}
return err
}

View File

@ -73,6 +73,6 @@ type PrometheusAlert struct {
type Function struct {
Name string `json:"name"`
Image string `json:"image"`
InvocationCount float64 `json:"invocationCount"`
InvocationCount float64 `json:"invocationCount"` // TODO: shouldn't this be int64?
Replicas uint64 `json:"replicas"`
}

View File

@ -143,7 +143,6 @@ func makeHandler(proxy *httputil.ReverseProxy, metrics *metrics.MetricOptions) h
metrics.GatewayFunctionsHistogram.WithLabelValues(service).Observe(seconds)
code := writeAdapter.GetHeaderCode()
metrics.GatewayFunctionInvocation.With(prometheus.Labels{"function_name": service, "code": strconv.Itoa(code)}).Inc()
}
}
}