Compare commits

..

2 Commits

Author SHA1 Message Date
bbd3b4ff07 Add comment to explain how method works
Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alexellis2@gmail.com>
2021-11-01 11:04:38 +00:00
1d07fda0a4 Wait for a function to become healthy in scale-up event
Prior to this change, after scaling a function up and
returning the API call, a function may still not be ready to
serve traffic. This resulted in HTTP errors, for a percentage
of the time, especially if the task was deleted instead of
being just paused.

Pausing was instant, but during re-creation the function needs
some time to start up.

This change puts a health check into the hot path for the
scale event. It is blocking, so scaling up will have some
additional latency, but will return with a ready endpoint
much more of the time than previously.

This approach means that faasd doesn't have to run a set of
exec or HTTP healthchecks continually, and use CPU for
each of them, even when a function is idle.

Tested with the nodeinfo function, by killing the task
and then invoking the function. Prior to this, the
function may give an error code some of the time.

Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alexellis2@gmail.com>
2021-11-01 11:00:39 +00:00
6 changed files with 103 additions and 132 deletions

View File

@ -45,7 +45,7 @@ Additional resources:
Most importantly, it's easy to manage so you can set it up and leave it alone to run your functions.
[![demo](https://pbs.twimg.com/media/EPNQz00W4AEwDxM?format=jpg&name=medium)](https://www.youtube.com/watch?v=WX1tZoSXy8E)
![demo](https://pbs.twimg.com/media/EPNQz00W4AEwDxM?format=jpg&name=small)
> Demo of faasd running asynchronous functions
@ -167,6 +167,4 @@ For completed features, WIP and upcoming roadmap see:
See [ROADMAP.md](docs/ROADMAP.md)
Want to build a patch without setting up a complete development environment? See [docs/PATCHES.md](docs/PATCHES.md)
Are you looking to hack on faasd? Follow the [developer instructions](docs/DEV.md) for a manual installation, or use the `hack/install.sh` script and pick up from there.

View File

@ -98,7 +98,7 @@ func makeProviderCmd() *cobra.Command {
DeployHandler: handlers.MakeDeployHandler(client, cni, baseUserSecretsPath, alwaysPull),
FunctionReader: handlers.MakeReadHandler(client),
ReplicaReader: handlers.MakeReplicaReaderHandler(client),
ReplicaUpdater: handlers.MakeReplicaUpdateHandler(client, cni),
ReplicaUpdater: handlers.MakeReplicaUpdateHandler(client, cni, invokeResolver),
UpdateHandler: handlers.MakeUpdateHandler(client, cni, baseUserSecretsPath, alwaysPull),
HealthHandler: func(w http.ResponseWriter, r *http.Request) {},
InfoHandler: handlers.MakeInfoHandler(Version, GitCommit),

View File

@ -1,11 +1,7 @@
## Instructions for building and testing faasd locally
## Instructions for hacking on faasd itself
> Note: if you're just wanting to try out faasd, then it's likely that you're on the wrong page. This is a detailed set of instructions for those wanting to contribute or customise faasd. Feel free to go back to the homepage and pick a tutorial instead.
Do you want to help the community test a pull request?
See these instructions instead: [Testing patches](/docs/PATCHES.md)
### Pre-reqs
> It's recommended that you do not install Docker on the same host as faasd, since 1) they may both use different versions of containerd and 2) docker's networking rules can disrupt faasd's networking. When using faasd - make your faasd server a faasd server, and build container image on your laptop or in a CI pipeline.

View File

@ -1,88 +0,0 @@
## Instructions for testing a patch for faasd
### Launch a virtual machine
You can use any kind of Linux virtual machine, Ubuntu 20.04 is recommended.
Launch a cloud VM or use [Multipass](https://multipass.run), which is free to use an can be run locally. A Raspberry Pi 3 or 4 could also be used, but will need you to run `make dist` to cross compile a valid binary.
### Copy over your SSH key
Your SSH key will be used, so that you can copy a new faasd binary over to the host.
```bash
multipass launch \
--mem 4G \
-c 2 \
-n faasd
# Then access its shell
multipass shell faasd
# Edit .ssh/authorized_keys
# Add .ssh/id_rsa.pub from your host and save the file
```
### Install faasd on the VM
You start off with the upstream version of faasd on the host, then add the new version over the top later on.
```bash
cd /tmp/
git clone https://github.com/openfaas/faasd --depth=1
cd faasd/hack
./install.sh
# Run the login command given to you at the end of the script
```
Get the multipass IP address:
```bash
export IP=$(multipass info faasd --format json| jq -r '.info.faasd.ipv4[0]')
```
### Build a new faasd binary with the patch
Check out faasd on your local computer
```bash
git clone https://github.com/openfaas/faasd
cd faasd
gh pr checkout #PR_NUMBER_HERE
GOOS=linux go build
# You can also run "make dist" which is slower, but includes
# a version and binaries for other platforms such as the Raspberry Pi
```
### Copy it over to the VM
Now build a new faasd binary and copy it to the VM:
```bash
scp faasd ubuntu@$IP:~/
```
Now deploy the new version on the VM:
```bash
killall -9 faasd-linux; killall -9 faasd-linux ; mv ./faasd-linux /usr/local/bin/faasd
```
### Check it worked and test that patch
Now run a command with `faas-cli` such as:
* `faas-cli list`
* `faas-cli version`
See the testing instructions on the PR and run through those steps.
Post your results on GitHub to assist the creator of the pull request.
You can see how to get the logs for various components using the [eBook Serverless For Everyone Else](https://gumroad.com/l/serverless-for-everyone-else), or by consulting the [DEV.md](/docs/DEV.md) guide.

View File

@ -75,10 +75,10 @@ sudo systemctl restart faasd
Should have:
* [ ] Offer a recommendation or implement a strategy for faasd replication/HA
* [ ] Resolve core services from functions by populating/sharing `/etc/hosts` between `faasd` and `faasd-provider`
* [ ] Docs or examples on how to use the various connectors and connector-sdk
* [ ] Monitor and restart any of the core components at runtime if the container stops
* [ ] Asynchronous function deletion instead of synchronous
* [ ] Asynchronous function start-up instead of synchronous
* [ ] Asynchronous deletion instead of synchronous
Nice to Have:
@ -89,8 +89,6 @@ Nice to Have:
### Completed
* [x] Docs or examples on how to use the various event connectors (Yes in the eBook)
* [x] Resolve core services from functions by populating/sharing `/etc/hosts` between `faasd` and `faasd-provider`
* [x] Provide a cloud-init configuration for faasd bootstrap
* [x] Configure core services from a docker-compose.yaml file
* [x] Store and fetch logs from the journal

View File

@ -6,16 +6,20 @@ import (
"fmt"
"io/ioutil"
"log"
"net"
"net/http"
"net/url"
"time"
"github.com/containerd/containerd"
"github.com/containerd/containerd/namespaces"
gocni "github.com/containerd/go-cni"
"github.com/openfaas/faas-provider/proxy"
"github.com/openfaas/faas-provider/types"
)
func MakeReplicaUpdateHandler(client *containerd.Client, cni gocni.CNI) func(w http.ResponseWriter, r *http.Request) {
func MakeReplicaUpdateHandler(client *containerd.Client, cni gocni.CNI, resolver proxy.BaseURLResolver) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
@ -30,12 +34,9 @@ func MakeReplicaUpdateHandler(client *containerd.Client, cni gocni.CNI) func(w h
log.Printf("[Scale] request: %s\n", string(body))
req := types.ScaleServiceRequest{}
err := json.Unmarshal(body, &req)
if err != nil {
if err := json.Unmarshal(body, &req); err != nil {
log.Printf("[Scale] error parsing input: %s\n", err)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
@ -55,18 +56,23 @@ func MakeReplicaUpdateHandler(client *containerd.Client, cni gocni.CNI) func(w h
name := req.ServiceName
if _, err := GetFunction(client, name, namespace); err != nil {
fn, err := GetFunction(client, name, namespace)
if err != nil {
msg := fmt.Sprintf("service %s not found", name)
log.Printf("[Scale] %s\n", msg)
http.Error(w, msg, http.StatusNotFound)
return
}
ctx := namespaces.WithNamespace(context.Background(), namespace)
healthPath := "/_/healthz"
if v := fn.annotations["com.openfaas.health.http.path"]; len(v) > 0 {
healthPath = v
}
ctr, ctrErr := client.LoadContainer(ctx, name)
if ctrErr != nil {
msg := fmt.Sprintf("cannot load service %s, error: %s", name, ctrErr)
ctx := namespaces.WithNamespace(context.Background(), namespace)
ctr, err := client.LoadContainer(ctx, name)
if err != nil {
msg := fmt.Sprintf("cannot load service %s, error: %s", name, err)
log.Printf("[Scale] %s\n", msg)
http.Error(w, msg, http.StatusNotFound)
return
@ -75,16 +81,16 @@ func MakeReplicaUpdateHandler(client *containerd.Client, cni gocni.CNI) func(w h
var taskExists bool
var taskStatus *containerd.Status
task, taskErr := ctr.Task(ctx, nil)
if taskErr != nil {
msg := fmt.Sprintf("cannot load task for service %s, error: %s", name, taskErr)
task, err := ctr.Task(ctx, nil)
if err != nil {
msg := fmt.Sprintf("cannot load task for service %s, error: %s", name, err)
log.Printf("[Scale] %s\n", msg)
taskExists = false
} else {
taskExists = true
status, statusErr := task.Status(ctx)
if statusErr != nil {
msg := fmt.Sprintf("cannot load task status for %s, error: %s", name, statusErr)
status, err := task.Status(ctx)
if err != nil {
msg := fmt.Sprintf("cannot load task status for %s, error: %s", name, err)
log.Printf("[Scale] %s\n", msg)
http.Error(w, msg, http.StatusInternalServerError)
return
@ -99,28 +105,31 @@ func MakeReplicaUpdateHandler(client *containerd.Client, cni gocni.CNI) func(w h
if req.Replicas == 0 {
// If a task is running, pause it
if taskExists && taskStatus.Status == containerd.Running {
if pauseErr := task.Pause(ctx); pauseErr != nil {
wrappedPauseErr := fmt.Errorf("error pausing task %s, error: %s", name, pauseErr)
log.Printf("[Scale] %s\n", wrappedPauseErr.Error())
http.Error(w, wrappedPauseErr.Error(), http.StatusNotFound)
if err := task.Pause(ctx); err != nil {
werr := fmt.Errorf("error pausing task %s, error: %s", name, err)
log.Printf("[Scale] %s\n", werr.Error())
http.Error(w, werr.Error(), http.StatusNotFound)
return
}
}
// Otherwise, no action is required
return
}
if taskExists {
if taskStatus != nil {
if taskStatus.Status == containerd.Paused {
if resumeErr := task.Resume(ctx); resumeErr != nil {
log.Printf("[Scale] error resuming task %s, error: %s\n", name, resumeErr)
http.Error(w, resumeErr.Error(), http.StatusBadRequest)
if err := task.Resume(ctx); err != nil {
log.Printf("[Scale] error resuming task %s, error: %s\n", name, err)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
} else if taskStatus.Status == containerd.Stopped {
// Stopped tasks cannot be restarted, must be removed, and created again
if _, delErr := task.Delete(ctx); delErr != nil {
log.Printf("[Scale] error deleting stopped task %s, error: %s\n", name, delErr)
http.Error(w, delErr.Error(), http.StatusBadRequest)
if _, err := task.Delete(ctx); err != nil {
log.Printf("[Scale] error deleting stopped task %s, error: %s\n", name, err)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
createNewTask = true
@ -131,12 +140,70 @@ func MakeReplicaUpdateHandler(client *containerd.Client, cni gocni.CNI) func(w h
}
if createNewTask {
deployErr := createTask(ctx, client, ctr, cni)
if deployErr != nil {
log.Printf("[Scale] error deploying %s, error: %s\n", name, deployErr)
http.Error(w, deployErr.Error(), http.StatusBadRequest)
err := createTask(ctx, client, ctr, cni)
if err != nil {
log.Printf("[Scale] error deploying %s, error: %s\n", name, err)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
}
if err := waitUntilHealthy(name, resolver, healthPath); err != nil {
log.Printf("[Scale] error waiting for function %s to become ready, error: %s\n", name, err)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
}
}
// waitUntilHealthy blocks until the healthPath returns a HTTP 200 for the
// IP address resolved for the given function.
// Maximum retries: 100
// Delay between each attempt: 20ms
// A custom path can be set via an annotation in the function's spec:
// com.openfaas.health.http.path: /handlers/ready
//
func waitUntilHealthy(name string, resolver proxy.BaseURLResolver, healthPath string) error {
endpoint, err := resolver.Resolve(name)
if err != nil {
return err
}
host, port, _ := net.SplitHostPort(endpoint.Host)
u, err := url.Parse(fmt.Sprintf("http://%s:%s%s", host, port, healthPath))
if err != nil {
return err
}
// Try to hit the health endpoint and block until
// ready.
attempts := 100
pause := time.Millisecond * 20
for i := 0; i < attempts; i++ {
req, err := http.NewRequest(http.MethodGet, u.String(), nil)
if err != nil {
return err
}
res, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
if res.Body != nil {
res.Body.Close()
}
if res.StatusCode != http.StatusOK {
return fmt.Errorf("unexpected health status: %d", res.StatusCode)
}
if err == nil {
break
}
time.Sleep(pause)
}
return nil
}