Compare commits

...

23 Commits
0.7.5 ... 0.8.1

Author SHA1 Message Date
cece6cf1ef Improve journalctl version compat
**What**
- Remove the `output-fields` flag because not all journalctl versions
  support it
- Add a short sleep to the start of the log stream to avoid some kind of
  race/buffering condition with the Handler

Signed-off-by: Lucas Roesler <roesler.lucas@gmail.com>
2020-03-07 10:11:09 +00:00
22882e2643 Initial journald log provider attempt
**What**
- journald log provider using exec to journalctl
```
journalctl -t <namespace>:<name>  --output=json --since=<timestamp> <--follow> --output-fields=SYSLOG_IDENTIFIER,MESSAGE,_PID,_SOURCE_REALTIME_TIMESTAMP
```
- This can be tested manually using `faas-cli logs` as normal, e.g.
  `faas-cli logs nodeinfo` should tail the last 5 mins of logs.
- Very basic tests ensuring that the `journalctl` comamand is correctly
  construction and that the json log entrys are parsed correctly.
- Add simple e2e test to grep the function logs

Signed-off-by: Lucas Roesler <roesler.lucas@gmail.com>
2020-03-07 10:11:09 +00:00
667d74aaf7 Skip adding function if GetFunction returns error
When ListFunctions populate it's function map, it should not add
functions that GetFunction returned error.

Signed-off-by: Carlos de Paula <me@carlosedp.com>
2020-03-07 07:25:19 +00:00
9dcdbfb7e3 Update DEV.md 2020-03-05 15:28:16 +00:00
3a9b81200e Promote to 0.8.0 with pull policy of always
Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alexellis2@gmail.com>
2020-03-02 17:23:48 +00:00
734425de25 Update the dev workflow
Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alexellis2@gmail.com>
2020-03-01 20:37:19 +00:00
70e7e0d25a Apply gofmt
Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alexellis2@gmail.com>
2020-03-01 20:13:18 +00:00
be8574ecd0 Always pull images by default
The behaviour prior to this patch caused some confusion for
users since they expected a behaviour like Swarm / Kubernetes
which always pulls images by default, even if cached. I've tested
the change and it is working as expected. By default images are
always pulled upon deployment.

To revert to the prior behaviour, simply add to faasd up:
--pull-policy=IfNotPresent

Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alexellis2@gmail.com>
2020-03-01 20:13:18 +00:00
a0110b3019 Makefile: added labels to test-e2e
Signed-off-by: kadern0 <kaderno@gmail.com>
2020-02-27 21:56:43 +00:00
87c71b090f Add label for single function query
Adding label when a /system/function/<name> endpoint
is invoked as it was missed in the previous commit

Signed-off-by: Martin Dekov <mvdekov@gmail.com>
2020-02-25 07:11:44 +00:00
dc8667d36a Return not implemented for logs
Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alexellis2@gmail.com>
2020-02-23 20:50:24 +00:00
137d199cb5 Update to 0.7.7
Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alexellis2@gmail.com>
2020-02-23 20:19:30 +00:00
560c295eb0 Enable labeling containers
Enabling the faasd-provider to label containers

Signed-off-by: Martin Dekov <mvdekov@gmail.com>
2020-02-23 20:06:30 +00:00
93325b713e Enable containerd on reboot
Fixes #48

Signed-off-by: Gabriel Duke <gabeduke@gmail.com>
2020-02-23 20:05:11 +00:00
2307fc71c5 Add log shim and collect command
The collect command redirects function logs to the journal for
viewing on journalctl. faas-cli logs is not implemented as of
yet. View logs with journalctl -t openfaas-fn:FN_NAME_HERE.

Tested on Dell XPS with Ubuntu Linux. The approach takes
inspiration from the Stellar project.

Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alexellis2@gmail.com>
2020-02-23 19:54:49 +00:00
853830c018 Add shim for collecting logs
Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alexellis2@gmail.com>
2020-02-23 19:54:49 +00:00
262770a0b7 Add note for credentials helper
Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alexellis2@gmail.com>
2020-02-22 18:56:16 +00:00
0efb6d492f Update developer docs
* Adds new step for installing containerd with systemd
* Adds warning up top that this is not for newbies :-)

Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alexellis2@gmail.com>
2020-02-17 17:47:11 +00:00
27cfe465ca Update context on DO
Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alexellis2@gmail.com>
2020-02-17 17:27:55 +00:00
d6c4ebaf96 Add help & support
Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alexellis2@gmail.com>
2020-02-17 17:26:18 +00:00
e9d1423315 Add terraform sample
Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alexellis2@gmail.com>
2020-02-17 17:18:24 +00:00
4bca5c36a5 Use 0.7.5 for auth support 2020-02-14 12:00:22 +00:00
10e7a2f07c Add tutorial for private registry
Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alexellis2@gmail.com>
2020-02-13 12:18:15 +00:00
28 changed files with 1469 additions and 174 deletions

17
Gopkg.lock generated
View File

@ -55,7 +55,7 @@
version = "0.7.1"
[[projects]]
digest = "1:386ca0ac781cc1b630b3ed21725759770174140164b3faf3810e6ed6366a970b"
digest = "1:cf83a14c8042951b0dcd74758fc32258111ecc7838cbdf5007717172cab9ca9b"
name = "github.com/containerd/containerd"
packages = [
".",
@ -103,6 +103,7 @@
"remotes/docker/schema1",
"rootfs",
"runtime/linux/runctypes",
"runtime/v2/logging",
"runtime/v2/runc/options",
"snapshots",
"snapshots/proxy",
@ -169,6 +170,14 @@
revision = "4cfb7b568922a3c79a23e438dc52fe537fc9687e"
version = "v0.7.1"
[[projects]]
digest = "1:bcf36df8d43860bfde913d008301aef27c6e9a303582118a837c4a34c0d18167"
name = "github.com/coreos/go-systemd"
packages = ["journal"]
pruneopts = "UT"
revision = "d3cd4ed1dbcf5835feba465b180436db54f20228"
version = "v21"
[[projects]]
digest = "1:92ebc9c068ab8e3fff03a58694ee33830964f6febd0130069aadce328802de14"
name = "github.com/docker/cli"
@ -352,12 +361,13 @@
version = "0.18.10"
[[projects]]
digest = "1:6f21508bd38feec0d440ca862f5adcb4c955713f3eb4e075b9af731e6ef258ba"
digest = "1:7a20be0bdfb2c05a4a7b955cb71645fe2983aa3c0bbae10d6bba3e2dd26ddd0d"
name = "github.com/openfaas/faas-provider"
packages = [
".",
"auth",
"httputil",
"logs",
"proxy",
"types",
]
@ -574,7 +584,9 @@
"github.com/containerd/containerd/oci",
"github.com/containerd/containerd/remotes",
"github.com/containerd/containerd/remotes/docker",
"github.com/containerd/containerd/runtime/v2/logging",
"github.com/containerd/go-cni",
"github.com/coreos/go-systemd/journal",
"github.com/docker/cli/cli/config",
"github.com/docker/cli/cli/config/configfile",
"github.com/docker/distribution/reference",
@ -582,6 +594,7 @@
"github.com/morikuni/aec",
"github.com/opencontainers/runtime-spec/specs-go",
"github.com/openfaas/faas-provider",
"github.com/openfaas/faas-provider/logs",
"github.com/openfaas/faas-provider/proxy",
"github.com/openfaas/faas-provider/types",
"github.com/openfaas/faas/gateway/requests",

View File

@ -37,9 +37,10 @@ prepare-test:
.PHONY: test-e2e
test-e2e:
sudo cat /var/lib/faasd/secrets/basic-auth-password | /usr/local/bin/faas-cli login --password-stdin
/usr/local/bin/faas-cli store deploy figlet --env write_timeout=1s --env read_timeout=1s
/usr/local/bin/faas-cli store deploy figlet --env write_timeout=1s --env read_timeout=1s --label testing=true
sleep 5
/usr/local/bin/faas-cli list -v
/usr/local/bin/faas-cli describe figlet | grep testing
uname | /usr/local/bin/faas-cli invoke figlet
uname | /usr/local/bin/faas-cli invoke figlet --async
sleep 10
@ -47,3 +48,5 @@ test-e2e:
/usr/local/bin/faas-cli remove figlet
sleep 3
/usr/local/bin/faas-cli list
sleep 1
/usr/local/bin/faas-cli logs figlet --follow=false | grep Forking

View File

@ -19,10 +19,10 @@ faasd is the same OpenFaaS experience and ecosystem, but without Kubernetes. Fun
## What does faasd deploy?
* faasd - itself, and its [faas-provider](https://github.com/openfaas/faas-provider) for containerd
* [Prometheus](https://github.com/prometheus/prometheus)
* [OpenFaaS Gateway & UI](https://github.com/openfaas/faas/tree/master/gateway)
* [OpenFaaS queue-worker for NATS](https://github.com/openfaas/nats-queue-worker)
* faasd - itself, and its [faas-provider](https://github.com/openfaas/faas-provider) for containerd - CRUD for functions and services, implements the OpenFaaS REST API
* [Prometheus](https://github.com/prometheus/prometheus) - for monitoring of services, metrics, scaling and dashboards
* [OpenFaaS Gateway](https://github.com/openfaas/faas/tree/master/gateway) - the UI portal, CLI, and other OpenFaaS tooling can talk to this.
* [OpenFaaS queue-worker for NATS](https://github.com/openfaas/nats-queue-worker) - run your invocations in the background without adding any code. See also: [asynchronous invocations](https://docs.openfaas.com/reference/triggers/#async-nats-streaming)
* [NATS](https://nats.io) for asynchronous processing and queues
You'll also need:
@ -35,9 +35,11 @@ You can use the standard [faas-cli](https://github.com/openfaas/faas-cli) along
## Tutorials
### Get started on DigitalOcean or with cloud-init
### Get started on DigitalOcean, or any other IaaS
* [Build a Serverless appliance with cloud-init and faasd](https://blog.alexellis.io/deploy-serverless-faasd-with-cloud-init/)
If your IaaS supports `user_data` aka "cloud-init", then this guide is for you. If not, then checkout the approach and feel free to run each step manually.
* [Build a Serverless appliance with faasd](https://blog.alexellis.io/deploy-serverless-faasd-with-cloud-init/)
### Run locally on MacOS, Linux, or Windows with Multipass.run
@ -49,14 +51,69 @@ You can run this tutorial on your Raspberry Pi, or adapt the steps for a regular
* [faasd - lightweight Serverless for your Raspberry Pi](https://blog.alexellis.io/faasd-for-lightweight-serverless/)
### Using private repos
### Terraform for DigitalOcean
Automate everything within < 60 seconds and get a public URL and IP address back. Customise as required, or adapt to your preferred cloud such as AWS EC2.
* [Provision faasd 0.7.5 on DigitalOcean with Terraform 0.12.0](https://gist.github.com/alexellis/fd618bd2f957eb08c44d086ef2fc3906)
### A note on private repos / registries
To use private image repos, `~/.docker/config.json` needs to be copied to `/var/lib/faasd/.docker/config.json`.
If you'd like to set up your own private registry, [see this tutorial](https://blog.alexellis.io/get-a-tls-enabled-docker-registry-in-5-minutes/).
Beware that running `docker login` on MacOS and Windows may create an empty file with your credentials stored in the system helper.
Alternatively, use you can use the `registry-login` command from the OpenFaaS Cloud bootstrap tool (ofc-bootstrap):
```bash
curl -sLSf https://raw.githubusercontent.com/openfaas-incubator/ofc-bootstrap/master/get.sh | sudo sh
ofc-bootstrap registry-login --username <your-registry-username> --password-stdin
# (the enter your password and hit return)
```
The file will be created in `./credentials/`
### Logs for functions
You can view the logs of functions using `journalctl`:
```bash
journalctl -t openfaas-fn:FUNCTION_NAME
faas-cli store deploy figlet
journalctl -t openfaas-fn:figlet -f &
echo logs | faas-cli invoke figlet
```
### Manual / developer instructions
See [here for manual / developer instructions](docs/DEV.md)
## Getting help
### Docs
The [OpenFaaS docs](https://docs.openfaas.com/) provide a wealth of information and are kept up to date with new features.
### Function and template store
For community functions see `faas-cli store --help`
For templates built by the community see: `faas-cli template store list`, you can also use the `dockerfile` template if you just want to migrate an existing service without the benefits of using a template.
### Workshop
[The OpenFaaS workshop](https://github.com/openfaas/workshop/) is a set of 12 self-paced labs and provides a great starting point
### Community support
An active community of almost 3000 users awaits you on Slack. Over 250 of those users are also contributors and help maintain the code.
* [Join Slack](https://slack.openfaas.io/)
## Backlog
### Supported operations
@ -64,7 +121,7 @@ See [here for manual / developer instructions](docs/DEV.md)
* `faas login`
* `faas up`
* `faas list`
* `faas describe`
* `faas describe`
* `faas deploy --update=true --replace=false`
* `faas invoke --async`
* `faas invoke`
@ -73,13 +130,13 @@ See [here for manual / developer instructions](docs/DEV.md)
* `faas version`
* `faas namespace`
* `faas secret`
* `faas logs`
Scale from and to zero is also supported. On a Dell XPS with a small, pre-pulled image unpausing an existing task took 0.19s and starting a task for a killed function took 0.39s. There may be further optimizations to be gained.
Other operations are pending development in the provider such as:
* `faas logs` - to stream logs on-demand for a known function
* `faas auth` - for the OAuth2 and OIDC integration
* `faas auth` - supported for Basic Authentication, but OAuth2 & OIDC require a patch
## Todo

View File

@ -11,12 +11,13 @@ runcmd:
- curl -sLSf https://github.com/containerd/containerd/releases/download/v1.3.2/containerd-1.3.2.linux-amd64.tar.gz > /tmp/containerd.tar.gz && tar -xvf /tmp/containerd.tar.gz -C /usr/local/bin/ --strip-components=1
- curl -SLfs https://raw.githubusercontent.com/containerd/containerd/v1.3.2/containerd.service | tee /etc/systemd/system/containerd.service
- systemctl daemon-reload && systemctl start containerd
- systemctl enable containerd
- /sbin/sysctl -w net.ipv4.conf.all.forwarding=1
- mkdir -p /opt/cni/bin
- curl -sSL https://github.com/containernetworking/plugins/releases/download/v0.8.5/cni-plugins-linux-amd64-v0.8.5.tgz | tar -xz -C /opt/cni/bin
- mkdir -p /go/src/github.com/openfaas/
- cd /go/src/github.com/openfaas/ && git clone https://github.com/openfaas/faasd
- curl -fSLs "https://github.com/openfaas/faasd/releases/download/0.7.4/faasd" --output "/usr/local/bin/faasd" && chmod a+x "/usr/local/bin/faasd"
- curl -fSLs "https://github.com/openfaas/faasd/releases/download/0.7.7/faasd" --output "/usr/local/bin/faasd" && chmod a+x "/usr/local/bin/faasd"
- cd /go/src/github.com/openfaas/faasd/ && /usr/local/bin/faasd install
- systemctl status -l containerd --no-pager
- journalctl -u faasd-provider --no-pager

60
cmd/collect.go Normal file
View File

@ -0,0 +1,60 @@
package cmd
import (
"bufio"
"context"
"fmt"
"io"
"sync"
"github.com/containerd/containerd/runtime/v2/logging"
"github.com/coreos/go-systemd/journal"
"github.com/spf13/cobra"
)
func CollectCommand() *cobra.Command {
return collectCmd
}
var collectCmd = &cobra.Command{
Use: "collect",
Short: "Collect logs to the journal",
RunE: runCollect,
}
func runCollect(_ *cobra.Command, _ []string) error {
logging.Run(logStdio)
return nil
}
// logStdio copied from
// https://github.com/containerd/containerd/pull/3085
// https://github.com/stellarproject/orbit
func logStdio(ctx context.Context, config *logging.Config, ready func() error) error {
// construct any log metadata for the container
vars := map[string]string{
"SYSLOG_IDENTIFIER": fmt.Sprintf("%s:%s", config.Namespace, config.ID),
}
var wg sync.WaitGroup
wg.Add(2)
// forward both stdout and stderr to the journal
go copy(&wg, config.Stdout, journal.PriInfo, vars)
go copy(&wg, config.Stderr, journal.PriErr, vars)
// signal that we are ready and setup for the container to be started
if err := ready(); err != nil {
return err
}
wg.Wait()
return nil
}
func copy(wg *sync.WaitGroup, r io.Reader, pri journal.Priority, vars map[string]string) {
defer wg.Done()
s := bufio.NewScanner(r)
for s.Scan() {
if s.Err() != nil {
return
}
journal.Send(s.Text(), pri, vars)
}
}

View File

@ -9,84 +9,101 @@ import (
"os"
"path"
"github.com/openfaas/faasd/pkg/cninetwork"
"github.com/openfaas/faasd/pkg/provider/config"
"github.com/openfaas/faasd/pkg/provider/handlers"
"github.com/containerd/containerd"
bootstrap "github.com/openfaas/faas-provider"
"github.com/openfaas/faas-provider/logs"
"github.com/openfaas/faas-provider/proxy"
"github.com/openfaas/faas-provider/types"
"github.com/openfaas/faasd/pkg/cninetwork"
faasdlogs "github.com/openfaas/faasd/pkg/logs"
"github.com/openfaas/faasd/pkg/provider/config"
"github.com/openfaas/faasd/pkg/provider/handlers"
"github.com/spf13/cobra"
)
var providerCmd = &cobra.Command{
Use: "provider",
Short: "Run the faasd-provider",
RunE: runProvider,
}
func runProvider(_ *cobra.Command, _ []string) error {
config, providerConfig, err := config.ReadFromEnv(types.OsEnv{})
if err != nil {
return err
func makeProviderCmd() *cobra.Command {
var command = &cobra.Command{
Use: "provider",
Short: "Run the faasd-provider",
}
log.Printf("faasd-provider starting..\tService Timeout: %s\n", config.WriteTimeout.String())
command.Flags().String("pull-policy", "Always", `Set to "Always" to force a pull of images upon deployment, or "IfNotPresent" to try to use a cached image.`)
wd, err := os.Getwd()
if err != nil {
return err
command.RunE = func(_ *cobra.Command, _ []string) error {
pullPolicy, flagErr := command.Flags().GetString("pull-policy")
if flagErr != nil {
return flagErr
}
alwaysPull := false
if pullPolicy == "Always" {
alwaysPull = true
}
config, providerConfig, err := config.ReadFromEnv(types.OsEnv{})
if err != nil {
return err
}
log.Printf("faasd-provider starting..\tService Timeout: %s\n", config.WriteTimeout.String())
wd, err := os.Getwd()
if err != nil {
return err
}
writeHostsErr := ioutil.WriteFile(path.Join(wd, "hosts"),
[]byte(`127.0.0.1 localhost`), workingDirectoryPermission)
if writeHostsErr != nil {
return fmt.Errorf("cannot write hosts file: %s", writeHostsErr)
}
writeResolvErr := ioutil.WriteFile(path.Join(wd, "resolv.conf"),
[]byte(`nameserver 8.8.8.8`), workingDirectoryPermission)
if writeResolvErr != nil {
return fmt.Errorf("cannot write resolv.conf file: %s", writeResolvErr)
}
cni, err := cninetwork.InitNetwork()
if err != nil {
return err
}
client, err := containerd.New(providerConfig.Sock)
if err != nil {
return err
}
defer client.Close()
invokeResolver := handlers.NewInvokeResolver(client)
userSecretPath := path.Join(wd, "secrets")
bootstrapHandlers := types.FaaSHandlers{
FunctionProxy: proxy.NewHandlerFunc(*config, invokeResolver),
DeleteHandler: handlers.MakeDeleteHandler(client, cni),
DeployHandler: handlers.MakeDeployHandler(client, cni, userSecretPath, alwaysPull),
FunctionReader: handlers.MakeReadHandler(client),
ReplicaReader: handlers.MakeReplicaReaderHandler(client),
ReplicaUpdater: handlers.MakeReplicaUpdateHandler(client, cni),
UpdateHandler: handlers.MakeUpdateHandler(client, cni, userSecretPath, alwaysPull),
HealthHandler: func(w http.ResponseWriter, r *http.Request) {},
InfoHandler: handlers.MakeInfoHandler(Version, GitCommit),
ListNamespaceHandler: listNamespaces(),
SecretHandler: handlers.MakeSecretHandler(client, userSecretPath),
LogHandler: logs.NewLogHandlerFunc(faasdlogs.New(), config.ReadTimeout),
}
log.Printf("Listening on TCP port: %d\n", *config.TCPPort)
bootstrap.Serve(&bootstrapHandlers, config)
return nil
}
writeHostsErr := ioutil.WriteFile(path.Join(wd, "hosts"),
[]byte(`127.0.0.1 localhost`), workingDirectoryPermission)
if writeHostsErr != nil {
return fmt.Errorf("cannot write hosts file: %s", writeHostsErr)
}
writeResolvErr := ioutil.WriteFile(path.Join(wd, "resolv.conf"),
[]byte(`nameserver 8.8.8.8`), workingDirectoryPermission)
if writeResolvErr != nil {
return fmt.Errorf("cannot write resolv.conf file: %s", writeResolvErr)
}
cni, err := cninetwork.InitNetwork()
if err != nil {
return err
}
client, err := containerd.New(providerConfig.Sock)
if err != nil {
return err
}
defer client.Close()
invokeResolver := handlers.NewInvokeResolver(client)
userSecretPath := path.Join(wd, "secrets")
bootstrapHandlers := types.FaaSHandlers{
FunctionProxy: proxy.NewHandlerFunc(*config, invokeResolver),
DeleteHandler: handlers.MakeDeleteHandler(client, cni),
DeployHandler: handlers.MakeDeployHandler(client, cni, userSecretPath),
FunctionReader: handlers.MakeReadHandler(client),
ReplicaReader: handlers.MakeReplicaReaderHandler(client),
ReplicaUpdater: handlers.MakeReplicaUpdateHandler(client, cni),
UpdateHandler: handlers.MakeUpdateHandler(client, cni, userSecretPath),
HealthHandler: func(w http.ResponseWriter, r *http.Request) {},
InfoHandler: handlers.MakeInfoHandler(Version, GitCommit),
ListNamespaceHandler: listNamespaces(),
SecretHandler: handlers.MakeSecretHandler(client, userSecretPath),
}
log.Printf("Listening on TCP port: %d\n", *config.TCPPort)
bootstrap.Serve(&bootstrapHandlers, config)
return nil
return command
}
func listNamespaces() func(w http.ResponseWriter, r *http.Request) {

View File

@ -14,7 +14,12 @@ func init() {
rootCommand.AddCommand(versionCmd)
rootCommand.AddCommand(upCmd)
rootCommand.AddCommand(installCmd)
rootCommand.AddCommand(providerCmd)
rootCommand.AddCommand(makeProviderCmd())
rootCommand.AddCommand(collectCmd)
}
func RootCommand() *cobra.Command {
return rootCommand
}
var (

View File

@ -14,8 +14,8 @@ import (
"github.com/pkg/errors"
"github.com/openfaas/faasd/pkg"
"github.com/alexellis/k3sup/pkg/env"
"github.com/openfaas/faasd/pkg"
"github.com/sethvargo/go-password/password"
"github.com/spf13/cobra"
)
@ -116,6 +116,7 @@ func runUp(_ *cobra.Command, _ []string) error {
log.Println(fileErr)
return
}
host := ""
lines := strings.Split(string(fileData), "\n")
for _, line := range lines {
@ -172,7 +173,7 @@ func makeServiceDefinitions(archSuffix string) []pkg.Service {
wd, _ := os.Getwd()
return []pkg.Service{
pkg.Service{
{
Name: "basic-auth-plugin",
Image: "docker.io/openfaas/basic-auth-plugin:0.18.10" + archSuffix,
Env: []string{
@ -182,11 +183,11 @@ func makeServiceDefinitions(archSuffix string) []pkg.Service {
"pass_filename=basic-auth-password",
},
Mounts: []pkg.Mount{
pkg.Mount{
{
Src: path.Join(path.Join(wd, "secrets"), "basic-auth-password"),
Dest: path.Join(containerSecretMountDir, "basic-auth-password"),
},
pkg.Mount{
{
Src: path.Join(path.Join(wd, "secrets"), "basic-auth-user"),
Dest: path.Join(containerSecretMountDir, "basic-auth-user"),
},
@ -194,26 +195,26 @@ func makeServiceDefinitions(archSuffix string) []pkg.Service {
Caps: []string{"CAP_NET_RAW"},
Args: nil,
},
pkg.Service{
{
Name: "nats",
Env: []string{""},
Image: "docker.io/library/nats-streaming:0.11.2",
Caps: []string{},
Args: []string{"/nats-streaming-server", "-m", "8222", "--store=memory", "--cluster_id=faas-cluster"},
},
pkg.Service{
{
Name: "prometheus",
Env: []string{},
Image: "docker.io/prom/prometheus:v2.14.0",
Mounts: []pkg.Mount{
pkg.Mount{
{
Src: path.Join(wd, "prometheus.yml"),
Dest: "/etc/prometheus/prometheus.yml",
},
},
Caps: []string{"CAP_NET_RAW"},
},
pkg.Service{
{
Name: "gateway",
Env: []string{
"basic_auth=true",
@ -231,18 +232,18 @@ func makeServiceDefinitions(archSuffix string) []pkg.Service {
},
Image: "docker.io/openfaas/gateway:0.18.8" + archSuffix,
Mounts: []pkg.Mount{
pkg.Mount{
{
Src: path.Join(path.Join(wd, "secrets"), "basic-auth-password"),
Dest: path.Join(containerSecretMountDir, "basic-auth-password"),
},
pkg.Mount{
{
Src: path.Join(path.Join(wd, "secrets"), "basic-auth-user"),
Dest: path.Join(containerSecretMountDir, "basic-auth-user"),
},
},
Caps: []string{"CAP_NET_RAW"},
},
pkg.Service{
{
Name: "queue-worker",
Env: []string{
"faas_nats_address=nats",
@ -257,11 +258,11 @@ func makeServiceDefinitions(archSuffix string) []pkg.Service {
},
Image: "docker.io/openfaas/queue-worker:0.9.0",
Mounts: []pkg.Mount{
pkg.Mount{
{
Src: path.Join(path.Join(wd, "secrets"), "basic-auth-password"),
Dest: path.Join(containerSecretMountDir, "basic-auth-password"),
},
pkg.Mount{
{
Src: path.Join(path.Join(wd, "secrets"), "basic-auth-user"),
Dest: path.Join(containerSecretMountDir, "basic-auth-user"),
},

View File

@ -1,5 +1,7 @@
## Manual installation of faasd for development
> Note: if you're just wanting to try out faasd, then it's likely that you're on the wrong page. This is a detailed set of instructions for those wanting to contribute or customise faasd. Feel free to go back to the homepage and pick a tutorial instead.
### Pre-reqs
* Linux
@ -30,44 +32,49 @@ curl -sLSf https://github.com/containerd/containerd/releases/download/v$VER/cont
containerd -version
```
* Or get my containerd binaries for armhf
* Or get my containerd binaries for Raspberry Pi (armhf)
Building containerd on armhf is extremely slow.
Building `containerd` on armhf is extremely slow, so I've provided binaries for you.
```sh
curl -sSL https://github.com/alexellis/containerd-armhf/releases/download/v1.3.2/containerd.tgz | sudo tar -xvz --strip-components=2 -C /usr/local/bin/
```
```sh
curl -sSL https://github.com/alexellis/containerd-armhf/releases/download/v1.3.2/containerd.tgz | sudo tar -xvz --strip-components=2 -C /usr/local/bin/
```
* Or clone / build / install [containerd](https://github.com/containerd/containerd) from source:
```sh
export GOPATH=$HOME/go/
mkdir -p $GOPATH/src/github.com/containerd
cd $GOPATH/src/github.com/containerd
git clone https://github.com/containerd/containerd
cd containerd
git fetch origin --tags
git checkout v1.3.2
```sh
export GOPATH=$HOME/go/
mkdir -p $GOPATH/src/github.com/containerd
cd $GOPATH/src/github.com/containerd
git clone https://github.com/containerd/containerd
cd containerd
git fetch origin --tags
git checkout v1.3.2
make
sudo make install
make
sudo make install
containerd --version
```
containerd --version
```
Kill any old containerd version:
#### Ensure containerd is running
```sh
# Kill any old version
sudo killall containerd
sudo systemctl disable containerd
curl -sLS https://raw.githubusercontent.com/containerd/containerd/master/containerd.service > /tmp/containerd.service
sudo cp /tmp/containerd.service /lib/systemd/system/
sudo systemctl enable containerd
sudo systemctl daemon-reload
sudo systemctl restart containerd
```
Start containerd in a new terminal:
Or run ad-hoc:
```sh
sudo containerd &
```
#### Enable forwarding
> This is required to allow containers in containerd to access the Internet via your computer's primary network interface.
@ -90,10 +97,11 @@ echo "net.ipv4.conf.all.forwarding=1" | sudo tee -a /etc/sysctl.conf
sudo apt update \
&& sudo apt install -qy \
runc \
bridge-utils
bridge-utils \
make
```
You may find alternatives for CentOS and other distributions.
You may find alternative package names for CentOS and other Linux distributions.
#### Install Go 1.13 (x86_64)
@ -109,6 +117,13 @@ export PATH=$PATH:/usr/local/go/bin/
go version
```
You should also add the following to `~/.bash_profile`:
```sh
export GOPATH=$HOME/go/
export PATH=$PATH:/usr/local/go/bin/
```
#### Or on Raspberry Pi (armhf)
```sh
@ -139,38 +154,125 @@ sudo mkdir -p /opt/cni/bin
curl -sSL https://github.com/containernetworking/plugins/releases/download/${CNI_VERSION}/cni-plugins-linux-${ARCH}-${CNI_VERSION}.tgz | sudo tar -xz -C /opt/cni/bin
```
Run or install faasd, which brings up the gateway and Prometheus as containers
#### Clone faasd and its systemd unit files
```sh
mkdir -p $GOPATH/src/github.com/openfaas/
cd $GOPATH/src/github.com/openfaas/
git clone https://github.com/openfaas/faasd
```
#### Build `faasd` from source (optional)
```sh
cd $GOPATH/src/github.com/openfaas/faasd
go build
# Install with systemd
# sudo ./faasd install
# Or run interactively
# sudo ./faasd up
cd faasd
make local
```
#### Build and run `faasd` (binaries)
```sh
# For x86_64
sudo curl -fSLs "https://github.com/openfaas/faasd/releases/download/0.7.4/faasd" \
sudo curl -fSLs "https://github.com/openfaas/faasd/releases/download/0.8.0/faasd" \
-o "/usr/local/bin/faasd" \
&& sudo chmod a+x "/usr/local/bin/faasd"
# armhf
sudo curl -fSLs "https://github.com/openfaas/faasd/releases/download/0.7.4/faasd-armhf" \
sudo curl -fSLs "https://github.com/openfaas/faasd/releases/download/0.8.0/faasd-armhf" \
-o "/usr/local/bin/faasd" \
&& sudo chmod a+x "/usr/local/bin/faasd"
# arm64
sudo curl -fSLs "https://github.com/openfaas/faasd/releases/download/0.7.4/faasd-arm64" \
sudo curl -fSLs "https://github.com/openfaas/faasd/releases/download/0.8.0/faasd-arm64" \
-o "/usr/local/bin/faasd" \
&& sudo chmod a+x "/usr/local/bin/faasd"
```
#### Install `faasd`
```sh
# Install with systemd
sudo cp bin/faasd /usr/local/bin
sudo faasd install
2020/02/17 17:38:06 Writing to: "/var/lib/faasd/secrets/basic-auth-password"
2020/02/17 17:38:06 Writing to: "/var/lib/faasd/secrets/basic-auth-user"
Login with:
sudo cat /var/lib/faasd/secrets/basic-auth-password | faas-cli login -s
```
You can now log in either from this machine or a remote machine using the OpenFaaS UI, or CLI.
Check that faasd is ready:
```
sudo journalctl -u faasd
```
You should see output like:
```
Feb 17 17:46:35 gold-survive faasd[4140]: 2020/02/17 17:46:35 Starting faasd proxy on 8080
Feb 17 17:46:35 gold-survive faasd[4140]: Gateway: 10.62.0.5:8080
Feb 17 17:46:35 gold-survive faasd[4140]: 2020/02/17 17:46:35 [proxy] Wait for done
Feb 17 17:46:35 gold-survive faasd[4140]: 2020/02/17 17:46:35 [proxy] Begin listen on 8080
```
To get the CLI for the command above run:
```sh
curl -sSLf https://cli.openfaas.com | sudo sh
```
#### Make a change to `faasd`
There are two components you can hack on:
For function CRUD you will work on `faasd provider` which is started from `cmd/provider.go`
For faasd itself, you will work on the code from `faasd up`, which is started from `cmd/up.go`
Before working on either, stop the systemd services:
```
sudo systemctl stop faasd & # up command
sudo systemctl stop faasd-provider # provider command
```
Here is a workflow you can use for each code change:
Enter the directory of the source code, and build a new binary:
```bash
cd $GOPATH/src/github.com/openfaas/faasd
go build
```
Copy that binary to `/usr/local/bin/`
```bash
cp faasd /usr/local/bin/
```
To run `faasd up`, run it from its working directory as root
```bash
sudo -i
cd /var/lib/faasd
faasd up
```
Now to run `faasd provider`, run it from its working directory:
```bash
sudo -i
cd /var/lib/faasd-provider
faasd provider
```
#### At run-time
Look in `hosts` in the current working folder or in `/var/lib/faasd/` to get the IP for the gateway or Prometheus

16
main.go
View File

@ -1,6 +1,7 @@
package main
import (
"fmt"
"os"
"github.com/openfaas/faasd/cmd"
@ -15,6 +16,21 @@ var (
)
func main() {
if _, ok := os.LookupEnv("CONTAINER_ID"); ok {
collect := cmd.RootCommand()
collect.SetArgs([]string{"collect"})
collect.SilenceUsage = true
collect.SilenceErrors = true
err := collect.Execute()
if err != nil {
fmt.Fprintf(os.Stderr, err.Error())
os.Exit(1)
}
os.Exit(0)
}
if err := cmd.Execute(Version, GitCommit); err != nil {
os.Exit(1)
}

6
pkg/contants.go Normal file
View File

@ -0,0 +1,6 @@
package pkg
const (
// FunctionNamespace is the default containerd namespace functions are created
FunctionNamespace = "openfaas-fn"
)

188
pkg/logs/requestor.go Normal file
View File

@ -0,0 +1,188 @@
package logs
import (
"context"
"encoding/json"
"fmt"
"io"
"log"
"os/exec"
"strconv"
"strings"
"time"
"github.com/openfaas/faas-provider/logs"
faasd "github.com/openfaas/faasd/pkg"
)
type requester struct{}
// New returns a new journalctl log Requester
func New() logs.Requester {
return &requester{}
}
// Query submits a log request to the actual logging system.
func (r *requester) Query(ctx context.Context, req logs.Request) (<-chan logs.Message, error) {
_, err := exec.LookPath("journalctl")
if err != nil {
return nil, fmt.Errorf("can not find journalctl: %w", err)
}
cmd := buildCmd(ctx, req)
stdout, err := cmd.StdoutPipe()
if err != nil {
return nil, fmt.Errorf("failed to create journalctl pipe: %w", err)
}
stderr, err := cmd.StderrPipe()
if err != nil {
return nil, fmt.Errorf("failed to create journalctl err pipe: %w", err)
}
err = cmd.Start()
if err != nil {
return nil, fmt.Errorf("failed to create journalctl: %w", err)
}
// call start and get the stdout prior to streaming so that we can return a meaningful
// error for as long as possible. If the cmd starts correctly, we are highly likely to
// succeed anyway
msgs := make(chan logs.Message, 100)
go streamLogs(ctx, cmd, stdout, msgs)
go logErrOut(stderr)
return msgs, nil
}
// buildCmd reeturns the equivalent of
//
// journalctl -t <namespace>:<name> \
// --output=json \
// --since=<timestamp> \
// <--follow> \
// --output-fields=SYSLOG_IDENTIFIER,MESSAGE,_PID,_SOURCE_REALTIME_TIMESTAMP
func buildCmd(ctx context.Context, req logs.Request) *exec.Cmd {
// // set the cursor position based on req, default to 5m
since := time.Now().Add(-5 * time.Minute)
if req.Since != nil && req.Since.Before(time.Now()) {
since = *req.Since
}
namespace := req.Namespace
if namespace == "" {
namespace = faasd.FunctionNamespace
}
// find the description of the fields here
// https://www.freedesktop.org/software/systemd/man/systemd.journal-fields.html
// the available fields can vary greatly, the selected fields were detemined by
// trial and error with journalctl in an ubuntu VM (via multipass)
args := []string{
"--utc",
"--no-pager",
"--output=json",
"--identifier=" + namespace + ":" + req.Name,
fmt.Sprintf("--since=%s", since.UTC().Format("2006-01-02 15:04:05")),
}
if req.Follow {
args = append(args, "--follow")
}
if req.Tail > 0 {
args = append(args, fmt.Sprintf("--lines=%d", req.Tail))
}
return exec.CommandContext(ctx, "journalctl", args...)
}
// streamLogs copies log entries from the journalctl `cmd`/`out` to `msgs`
// the loop is based on the Decoder example in the docs
// https://golang.org/pkg/encoding/json/#Decoder.Decode
func streamLogs(ctx context.Context, cmd *exec.Cmd, out io.ReadCloser, msgs chan logs.Message) {
// without this sleep the channel seems to get stuck. This results in either no log messages
// being read by the Handler _or_ the messages are read but only flushed when the request
// timesout
time.Sleep(time.Millisecond)
log.Println("starting journal stream using ", cmd.String())
// will ensure `out` is closed and all related resources cleaned up
go func() {
err := cmd.Wait()
log.Println("wait result", err)
}()
defer func() {
log.Println("closing journal stream")
close(msgs)
}()
dec := json.NewDecoder(out)
for dec.More() {
if ctx.Err() != nil {
log.Println("log stream context cancelled")
return
}
// the journalctl outputs all the values as a string, so a struct with json
// tags wont help much
entry := map[string]string{}
err := dec.Decode(&entry)
if err != nil {
log.Printf("error decoding journalctl output: %s", err)
return
}
msg, err := parseEntry(entry)
if err != nil {
log.Printf("error parsing journalctl output: %s", err)
return
}
msgs <- msg
}
}
// parseEntry reads the deserialized json from journalctl into a log.Message
//
// The following fields are parsed from the journal
// - MESSAGE
// - _PID
// - SYSLOG_IDENTIFIER
// - __REALTIME_TIMESTAMP
func parseEntry(entry map[string]string) (logs.Message, error) {
logMsg := logs.Message{
Text: entry["MESSAGE"],
Instance: entry["_PID"],
}
identifier := entry["SYSLOG_IDENTIFIER"]
parts := strings.Split(identifier, ":")
if len(parts) != 2 {
return logMsg, fmt.Errorf("invalid SYSLOG_IDENTIFIER")
}
logMsg.Namespace = parts[0]
logMsg.Name = parts[1]
ts, ok := entry["__REALTIME_TIMESTAMP"]
if !ok {
return logMsg, fmt.Errorf("missing required field __REALTIME_TIMESTAMP")
}
ms, err := strconv.ParseInt(ts, 10, 64)
if err != nil {
return logMsg, fmt.Errorf("invalid timestamp: %w", err)
}
logMsg.Timestamp = time.Unix(0, ms*1000).UTC()
return logMsg, nil
}
func logErrOut(out io.ReadCloser) {
defer log.Println("stderr closed")
defer out.Close()
io.Copy(log.Writer(), out)
}

View File

@ -0,0 +1,73 @@
package logs
import (
"context"
"encoding/json"
"fmt"
"strings"
"testing"
"time"
"github.com/openfaas/faas-provider/logs"
)
func Test_parseEntry(t *testing.T) {
rawEntry := `{ "__CURSOR" : "s=71c4550142d14ace8e2959e3540cc15c;i=133c;b=44864010f0d94baba7b6bf8019f82a56;m=2945cd3;t=5a00d4eb59180;x=8ed47f7f9b3d798", "__REALTIME_TIMESTAMP" : "1583353899094400", "__MONOTONIC_TIMESTAMP" : "43277523", "_BOOT_ID" : "44864010f0d94baba7b6bf8019f82a56", "SYSLOG_IDENTIFIER" : "openfaas-fn:nodeinfo", "_PID" : "2254", "MESSAGE" : "2020/03/04 20:31:39 POST / - 200 OK - ContentLength: 83", "_SOURCE_REALTIME_TIMESTAMP" : "1583353899094372" }`
expectedEntry := logs.Message{
Name: "nodeinfo",
Namespace: "openfaas-fn",
Text: "2020/03/04 20:31:39 POST / - 200 OK - ContentLength: 83",
Timestamp: time.Unix(0, 1583353899094400*1000).UTC(),
}
value := map[string]string{}
json.Unmarshal([]byte(rawEntry), &value)
entry, err := parseEntry(value)
if err != nil {
t.Fatalf("unexpected error %s", err)
}
if entry.Name != expectedEntry.Name {
t.Fatalf("expected Name %s, got %s", expectedEntry.Name, entry.Name)
}
if entry.Namespace != expectedEntry.Namespace {
t.Fatalf("expected Namespace %s, got %s", expectedEntry.Namespace, entry.Namespace)
}
if entry.Timestamp != expectedEntry.Timestamp {
t.Fatalf("expected Timestamp %s, got %s", expectedEntry.Timestamp, entry.Timestamp)
}
if entry.Text != expectedEntry.Text {
t.Fatalf("expected Text %s, got %s", expectedEntry.Text, entry.Text)
}
}
func Test_buildCmd(t *testing.T) {
ctx := context.TODO()
now := time.Now()
req := logs.Request{
Name: "loggyfunc",
Namespace: "spacetwo",
Follow: true,
Since: &now,
Tail: 5,
}
expectedArgs := fmt.Sprintf(
"--utc --no-pager --output=json --output-fields=SYSLOG_IDENTIFIER,MESSAGE,_PID,_SOURCE_REALTIME_TIMESTAMP --identifier=spacetwo:loggyfunc --since=%s --follow --lines=5",
now.UTC().Format("2006-01-02 15:04:05"),
)
cmd := buildCmd(ctx, req).String()
if !strings.Contains(cmd, "journalctl") {
t.Fatalf("expected journalctl cmd, got cmd %s", cmd)
}
if !strings.HasSuffix(cmd, expectedArgs) {
t.Fatalf("expected arg %s,\ngot cmd %s", expectedArgs, cmd)
}
}

View File

@ -8,12 +8,14 @@ import (
"log"
"net/http"
cninetwork "github.com/openfaas/faasd/pkg/cninetwork"
"github.com/openfaas/faasd/pkg/service"
"github.com/containerd/containerd"
"github.com/containerd/containerd/namespaces"
gocni "github.com/containerd/go-cni"
"github.com/openfaas/faas/gateway/requests"
faasd "github.com/openfaas/faasd/pkg"
cninetwork "github.com/openfaas/faasd/pkg/cninetwork"
"github.com/openfaas/faasd/pkg/service"
)
func MakeDeleteHandler(client *containerd.Client, cni gocni.CNI) func(w http.ResponseWriter, r *http.Request) {
@ -49,7 +51,7 @@ func MakeDeleteHandler(client *containerd.Client, cni gocni.CNI) func(w http.Res
return
}
ctx := namespaces.WithNamespace(context.Background(), FunctionNamespace)
ctx := namespaces.WithNamespace(context.Background(), faasd.FunctionNamespace)
// TODO: this needs to still happen if the task is paused
if function.replicas != 0 {

View File

@ -18,12 +18,13 @@ import (
"github.com/docker/distribution/reference"
"github.com/opencontainers/runtime-spec/specs-go"
"github.com/openfaas/faas-provider/types"
faasd "github.com/openfaas/faasd/pkg"
cninetwork "github.com/openfaas/faasd/pkg/cninetwork"
"github.com/openfaas/faasd/pkg/service"
"github.com/pkg/errors"
)
func MakeDeployHandler(client *containerd.Client, cni gocni.CNI, secretMountPath string) func(w http.ResponseWriter, r *http.Request) {
func MakeDeployHandler(client *containerd.Client, cni gocni.CNI, secretMountPath string, alwaysPull bool) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
@ -52,9 +53,9 @@ func MakeDeployHandler(client *containerd.Client, cni gocni.CNI, secretMountPath
}
name := req.Service
ctx := namespaces.WithNamespace(context.Background(), FunctionNamespace)
ctx := namespaces.WithNamespace(context.Background(), faasd.FunctionNamespace)
deployErr := deploy(ctx, req, client, cni, secretMountPath)
deployErr := deploy(ctx, req, client, cni, secretMountPath, alwaysPull)
if deployErr != nil {
log.Printf("[Deploy] error deploying %s, error: %s\n", name, deployErr)
http.Error(w, deployErr.Error(), http.StatusBadRequest)
@ -63,7 +64,7 @@ func MakeDeployHandler(client *containerd.Client, cni gocni.CNI, secretMountPath
}
}
func deploy(ctx context.Context, req types.FunctionDeployment, client *containerd.Client, cni gocni.CNI, secretMountPath string) error {
func deploy(ctx context.Context, req types.FunctionDeployment, client *containerd.Client, cni gocni.CNI, secretMountPath string, alwaysPull bool) error {
r, err := reference.ParseNormalizedNamed(req.Image)
if err != nil {
return err
@ -75,7 +76,7 @@ func deploy(ctx context.Context, req types.FunctionDeployment, client *container
snapshotter = val
}
image, err := service.PrepareImage(ctx, client, imgRef, snapshotter)
image, err := service.PrepareImage(ctx, client, imgRef, snapshotter, alwaysPull)
if err != nil {
return errors.Wrapf(err, "unable to pull image %s", imgRef)
}
@ -107,6 +108,7 @@ func deploy(ctx context.Context, req types.FunctionDeployment, client *container
oci.WithCapabilities([]string{"CAP_NET_RAW"}),
oci.WithMounts(mounts),
oci.WithEnv(envs)),
containerd.WithContainerLabels(*req.Labels),
)
if err != nil {
@ -120,7 +122,10 @@ func deploy(ctx context.Context, req types.FunctionDeployment, client *container
func createTask(ctx context.Context, client *containerd.Client, container containerd.Container, cni gocni.CNI) error {
name := container.ID()
task, taskErr := container.NewTask(ctx, cio.NewCreator(cio.WithStdio))
// task, taskErr := container.NewTask(ctx, cio.NewCreator(cio.WithStdio))
task, taskErr := container.NewTask(ctx, cio.BinaryIO("/usr/local/bin/faasd", nil))
if taskErr != nil {
return fmt.Errorf("unable to start task: %s, error: %s", name, taskErr)
}

View File

@ -3,10 +3,13 @@ package handlers
import (
"context"
"fmt"
"log"
"github.com/openfaas/faasd/pkg/cninetwork"
"github.com/containerd/containerd"
"github.com/containerd/containerd/namespaces"
"github.com/openfaas/faasd/pkg/cninetwork"
faasd "github.com/openfaas/faasd/pkg"
)
type Function struct {
@ -16,38 +19,45 @@ type Function struct {
pid uint32
replicas int
IP string
labels map[string]string
}
const (
// FunctionNamespace is the containerd namespace functions are created
FunctionNamespace = "openfaas-fn"
)
// ListFunctions returns a map of all functions with running tasks on namespace
func ListFunctions(client *containerd.Client) (map[string]Function, error) {
ctx := namespaces.WithNamespace(context.Background(), FunctionNamespace)
ctx := namespaces.WithNamespace(context.Background(), faasd.FunctionNamespace)
functions := make(map[string]Function)
containers, _ := client.Containers(ctx)
for _, k := range containers {
name := k.ID()
functions[name], _ = GetFunction(client, name)
f, err := GetFunction(client, name)
if err != nil {
continue
}
functions[name] = f
}
return functions, nil
}
// GetFunction returns a function that matches name
func GetFunction(client *containerd.Client, name string) (Function, error) {
ctx := namespaces.WithNamespace(context.Background(), FunctionNamespace)
ctx := namespaces.WithNamespace(context.Background(), faasd.FunctionNamespace)
c, err := client.LoadContainer(ctx, name)
if err == nil {
image, _ := c.Image(ctx)
containerName := c.ID()
labels, labelErr := c.Labels(ctx)
if labelErr != nil {
log.Printf("cannot list container %s labels: %s", containerName, labelErr.Error())
}
f := Function{
name: c.ID(),
namespace: FunctionNamespace,
name: containerName,
namespace: faasd.FunctionNamespace,
image: image.Name(),
labels: labels,
}
replicas := 0

View File

@ -26,6 +26,7 @@ func MakeReadHandler(client *containerd.Client) func(w http.ResponseWriter, r *h
Image: function.image,
Replicas: uint64(function.replicas),
Namespace: function.namespace,
Labels: &function.labels,
})
}

View File

@ -21,6 +21,7 @@ func MakeReplicaReaderHandler(client *containerd.Client) func(w http.ResponseWri
AvailableReplicas: uint64(f.replicas),
Replicas: uint64(f.replicas),
Namespace: f.namespace,
Labels: &f.labels,
}
functionBytes, _ := json.Marshal(found)

View File

@ -12,6 +12,7 @@ import (
"github.com/containerd/containerd/namespaces"
gocni "github.com/containerd/go-cni"
"github.com/openfaas/faas-provider/types"
faasd "github.com/openfaas/faasd/pkg"
)
func MakeReplicaUpdateHandler(client *containerd.Client, cni gocni.CNI) func(w http.ResponseWriter, r *http.Request) {
@ -47,7 +48,7 @@ func MakeReplicaUpdateHandler(client *containerd.Client, cni gocni.CNI) func(w h
return
}
ctx := namespaces.WithNamespace(context.Background(), FunctionNamespace)
ctx := namespaces.WithNamespace(context.Background(), faasd.FunctionNamespace)
ctr, ctrErr := client.LoadContainer(ctx, name)
if ctrErr != nil {

View File

@ -8,15 +8,17 @@ import (
"log"
"net/http"
"github.com/openfaas/faasd/pkg/cninetwork"
"github.com/openfaas/faasd/pkg/service"
"github.com/containerd/containerd"
"github.com/containerd/containerd/namespaces"
gocni "github.com/containerd/go-cni"
"github.com/openfaas/faas-provider/types"
faasd "github.com/openfaas/faasd/pkg"
"github.com/openfaas/faasd/pkg/cninetwork"
"github.com/openfaas/faasd/pkg/service"
)
func MakeUpdateHandler(client *containerd.Client, cni gocni.CNI, secretMountPath string) func(w http.ResponseWriter, r *http.Request) {
func MakeUpdateHandler(client *containerd.Client, cni gocni.CNI, secretMountPath string, alwaysPull bool) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
@ -53,7 +55,7 @@ func MakeUpdateHandler(client *containerd.Client, cni gocni.CNI, secretMountPath
http.Error(w, err.Error(), http.StatusBadRequest)
}
ctx := namespaces.WithNamespace(context.Background(), FunctionNamespace)
ctx := namespaces.WithNamespace(context.Background(), faasd.FunctionNamespace)
if function.replicas != 0 {
err = cninetwork.DeleteCNINetwork(ctx, cni, client, name)
if err != nil {
@ -68,7 +70,7 @@ func MakeUpdateHandler(client *containerd.Client, cni gocni.CNI, secretMountPath
return
}
deployErr := deploy(ctx, req, client, cni, secretMountPath)
deployErr := deploy(ctx, req, client, cni, secretMountPath, alwaysPull)
if deployErr != nil {
log.Printf("[Update] error deploying %s, error: %s\n", name, deployErr)
http.Error(w, deployErr.Error(), http.StatusBadRequest)

View File

@ -122,11 +122,12 @@ func getResolver(ctx context.Context, configFile *configfile.ConfigFile) (remote
return docker.NewResolver(opts), nil
}
func PrepareImage(ctx context.Context, client *containerd.Client, imageName, snapshotter string) (containerd.Image, error) {
func PrepareImage(ctx context.Context, client *containerd.Client, imageName, snapshotter string, pullAlways bool) (containerd.Image, error) {
var (
empty containerd.Image
resolver remotes.Resolver
)
if _, stErr := os.Stat(filepath.Join(dockerConfigDir, config.ConfigFileName)); stErr == nil {
configFile, err := config.Load(dockerConfigDir)
if err != nil {
@ -140,22 +141,29 @@ func PrepareImage(ctx context.Context, client *containerd.Client, imageName, sna
return empty, stErr
}
image, err := client.GetImage(ctx, imageName)
if err != nil {
if !errdefs.IsNotFound(err) {
var image containerd.Image
if pullAlways {
img, err := pullImage(ctx, client, resolver, imageName)
if err != nil {
return empty, err
}
rOpts := []containerd.RemoteOpt{
containerd.WithPullUnpack,
}
if resolver != nil {
rOpts = append(rOpts, containerd.WithResolver(resolver))
}
img, err := client.Pull(ctx, imageName, rOpts...)
if err != nil {
return empty, fmt.Errorf("cannot pull: %s", err)
}
image = img
} else {
img, err := client.GetImage(ctx, imageName)
if err != nil {
if !errdefs.IsNotFound(err) {
return empty, err
}
img, err := pullImage(ctx, client, resolver, imageName)
if err != nil {
return empty, err
}
image = img
} else {
image = img
}
}
unpacked, err := image.IsUnpacked(ctx, snapshotter)
@ -171,3 +179,21 @@ func PrepareImage(ctx context.Context, client *containerd.Client, imageName, sna
return image, nil
}
func pullImage(ctx context.Context, client *containerd.Client, resolver remotes.Resolver, imageName string) (containerd.Image, error) {
var empty containerd.Image
rOpts := []containerd.RemoteOpt{
containerd.WithPullUnpack,
}
if resolver != nil {
rOpts = append(rOpts, containerd.WithResolver(resolver))
}
img, err := client.Pull(ctx, imageName, rOpts...)
if err != nil {
return empty, fmt.Errorf("cannot pull: %s", err)
}
return img, nil
}

View File

@ -8,13 +8,13 @@ import (
"os"
"path"
"github.com/openfaas/faasd/pkg/cninetwork"
"github.com/openfaas/faasd/pkg/service"
"github.com/containerd/containerd"
"github.com/containerd/containerd/cio"
"github.com/containerd/containerd/containers"
"github.com/containerd/containerd/oci"
gocni "github.com/containerd/go-cni"
"github.com/openfaas/faasd/pkg/cninetwork"
"github.com/openfaas/faasd/pkg/service"
"github.com/containerd/containerd/namespaces"
"github.com/opencontainers/runtime-spec/specs-go"
@ -24,7 +24,8 @@ const (
defaultSnapshotter = "overlayfs"
workingDirectoryPermission = 0644
// faasdNamespace is the containerd namespace services are created
faasdNamespace = "default"
faasdNamespace = "default"
faasServicesPullAlways = false
)
type Service struct {
@ -88,7 +89,7 @@ func (s *Supervisor) Start(svcs []Service) error {
for _, svc := range svcs {
fmt.Printf("Preparing: %s with image: %s\n", svc.Name, svc.Image)
img, err := service.PrepareImage(ctx, s.client, svc.Image, defaultSnapshotter)
img, err := service.PrepareImage(ctx, s.client, svc.Image, defaultSnapshotter, faasServicesPullAlways)
if err != nil {
return err
}

View File

@ -0,0 +1,77 @@
// +build !windows
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package logging
import (
"context"
"fmt"
"io"
"os"
"os/signal"
"golang.org/x/sys/unix"
)
// Config of the container logs
type Config struct {
ID string
Namespace string
Stdout io.Reader
Stderr io.Reader
}
// LoggerFunc is implemented by custom v2 logging binaries
type LoggerFunc func(context.Context, *Config, func() error) error
// Run the logging driver
func Run(fn LoggerFunc) {
ctx, cancel := context.WithCancel(context.Background())
config := &Config{
ID: os.Getenv("CONTAINER_ID"),
Namespace: os.Getenv("CONTAINER_NAMESPACE"),
Stdout: os.NewFile(3, "CONTAINER_STDOUT"),
Stderr: os.NewFile(4, "CONTAINER_STDERR"),
}
var (
s = make(chan os.Signal, 32)
errCh = make(chan error, 1)
wait = os.NewFile(5, "CONTAINER_WAIT")
)
signal.Notify(s, unix.SIGTERM)
go func() {
if err := fn(ctx, config, wait.Close); err != nil {
errCh <- err
}
errCh <- nil
}()
for {
select {
case <-s:
cancel()
case err := <-errCh:
if err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
os.Exit(0)
}
}
}

191
vendor/github.com/coreos/go-systemd/LICENSE generated vendored Normal file
View File

@ -0,0 +1,191 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction, and
distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by the copyright
owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all other entities
that control, are controlled by, or are under common control with that entity.
For the purposes of this definition, "control" means (i) the power, direct or
indirect, to cause the direction or management of such entity, whether by
contract or otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity exercising
permissions granted by this License.
"Source" form shall mean the preferred form for making modifications, including
but not limited to software source code, documentation source, and configuration
files.
"Object" form shall mean any form resulting from mechanical transformation or
translation of a Source form, including but not limited to compiled object code,
generated documentation, and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or Object form, made
available under the License, as indicated by a copyright notice that is included
in or attached to the work (an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object form, that
is based on (or derived from) the Work and for which the editorial revisions,
annotations, elaborations, or other modifications represent, as a whole, an
original work of authorship. For the purposes of this License, Derivative Works
shall not include works that remain separable from, or merely link (or bind by
name) to the interfaces of, the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including the original version
of the Work and any modifications or additions to that Work or Derivative Works
thereof, that is intentionally submitted to Licensor for inclusion in the Work
by the copyright owner or by an individual or Legal Entity authorized to submit
on behalf of the copyright owner. For the purposes of this definition,
"submitted" means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems, and
issue tracking systems that are managed by, or on behalf of, the Licensor for
the purpose of discussing and improving the Work, but excluding communication
that is conspicuously marked or otherwise designated in writing by the copyright
owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity on behalf
of whom a Contribution has been received by Licensor and subsequently
incorporated within the Work.
2. Grant of Copyright License.
Subject to the terms and conditions of this License, each Contributor hereby
grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free,
irrevocable copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the Work and such
Derivative Works in Source or Object form.
3. Grant of Patent License.
Subject to the terms and conditions of this License, each Contributor hereby
grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free,
irrevocable (except as stated in this section) patent license to make, have
made, use, offer to sell, sell, import, and otherwise transfer the Work, where
such license applies only to those patent claims licensable by such Contributor
that are necessarily infringed by their Contribution(s) alone or by combination
of their Contribution(s) with the Work to which such Contribution(s) was
submitted. If You institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work or a
Contribution incorporated within the Work constitutes direct or contributory
patent infringement, then any patent licenses granted to You under this License
for that Work shall terminate as of the date such litigation is filed.
4. Redistribution.
You may reproduce and distribute copies of the Work or Derivative Works thereof
in any medium, with or without modifications, and in Source or Object form,
provided that You meet the following conditions:
You must give any other recipients of the Work or Derivative Works a copy of
this License; and
You must cause any modified files to carry prominent notices stating that You
changed the files; and
You must retain, in the Source form of any Derivative Works that You distribute,
all copyright, patent, trademark, and attribution notices from the Source form
of the Work, excluding those notices that do not pertain to any part of the
Derivative Works; and
If the Work includes a "NOTICE" text file as part of its distribution, then any
Derivative Works that You distribute must include a readable copy of the
attribution notices contained within such NOTICE file, excluding those notices
that do not pertain to any part of the Derivative Works, in at least one of the
following places: within a NOTICE text file distributed as part of the
Derivative Works; within the Source form or documentation, if provided along
with the Derivative Works; or, within a display generated by the Derivative
Works, if and wherever such third-party notices normally appear. The contents of
the NOTICE file are for informational purposes only and do not modify the
License. You may add Your own attribution notices within Derivative Works that
You distribute, alongside or as an addendum to the NOTICE text from the Work,
provided that such additional attribution notices cannot be construed as
modifying the License.
You may add Your own copyright statement to Your modifications and may provide
additional or different license terms and conditions for use, reproduction, or
distribution of Your modifications, or for any such Derivative Works as a whole,
provided Your use, reproduction, and distribution of the Work otherwise complies
with the conditions stated in this License.
5. Submission of Contributions.
Unless You explicitly state otherwise, any Contribution intentionally submitted
for inclusion in the Work by You to the Licensor shall be under the terms and
conditions of this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify the terms of
any separate license agreement you may have executed with Licensor regarding
such Contributions.
6. Trademarks.
This License does not grant permission to use the trade names, trademarks,
service marks, or product names of the Licensor, except as required for
reasonable and customary use in describing the origin of the Work and
reproducing the content of the NOTICE file.
7. Disclaimer of Warranty.
Unless required by applicable law or agreed to in writing, Licensor provides the
Work (and each Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied,
including, without limitation, any warranties or conditions of TITLE,
NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A PARTICULAR PURPOSE. You are
solely responsible for determining the appropriateness of using or
redistributing the Work and assume any risks associated with Your exercise of
permissions under this License.
8. Limitation of Liability.
In no event and under no legal theory, whether in tort (including negligence),
contract, or otherwise, unless required by applicable law (such as deliberate
and grossly negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special, incidental,
or consequential damages of any character arising as a result of this License or
out of the use or inability to use the Work (including but not limited to
damages for loss of goodwill, work stoppage, computer failure or malfunction, or
any and all other commercial damages or losses), even if such Contributor has
been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability.
While redistributing the Work or Derivative Works thereof, You may choose to
offer, and charge a fee for, acceptance of support, warranty, indemnity, or
other liability obligations and/or rights consistent with this License. However,
in accepting such obligations, You may act only on Your own behalf and on Your
sole responsibility, not on behalf of any other Contributor, and only if You
agree to indemnify, defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason of your
accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work
To apply the Apache License to your work, attach the following boilerplate
notice, with the fields enclosed by brackets "[]" replaced with your own
identifying information. (Don't include the brackets!) The text should be
enclosed in the appropriate comment syntax for the file format. We also
recommend that a file or class name and description of purpose be included on
the same "printed page" as the copyright notice for easier identification within
third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

5
vendor/github.com/coreos/go-systemd/NOTICE generated vendored Normal file
View File

@ -0,0 +1,5 @@
CoreOS Project
Copyright 2018 CoreOS, Inc
This product includes software developed at CoreOS, Inc.
(http://www.coreos.com/).

225
vendor/github.com/coreos/go-systemd/journal/journal.go generated vendored Normal file
View File

@ -0,0 +1,225 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package journal provides write bindings to the local systemd journal.
// It is implemented in pure Go and connects to the journal directly over its
// unix socket.
//
// To read from the journal, see the "sdjournal" package, which wraps the
// sd-journal a C API.
//
// http://www.freedesktop.org/software/systemd/man/systemd-journald.service.html
package journal
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"io"
"io/ioutil"
"net"
"os"
"strconv"
"strings"
"sync"
"sync/atomic"
"syscall"
"unsafe"
)
// Priority of a journal message
type Priority int
const (
PriEmerg Priority = iota
PriAlert
PriCrit
PriErr
PriWarning
PriNotice
PriInfo
PriDebug
)
var (
// This can be overridden at build-time:
// https://github.com/golang/go/wiki/GcToolchainTricks#including-build-information-in-the-executable
journalSocket = "/run/systemd/journal/socket"
// unixConnPtr atomically holds the local unconnected Unix-domain socket.
// Concrete safe pointer type: *net.UnixConn
unixConnPtr unsafe.Pointer
// onceConn ensures that unixConnPtr is initialized exactly once.
onceConn sync.Once
)
func init() {
onceConn.Do(initConn)
}
// Enabled checks whether the local systemd journal is available for logging.
func Enabled() bool {
onceConn.Do(initConn)
if (*net.UnixConn)(atomic.LoadPointer(&unixConnPtr)) == nil {
return false
}
if _, err := net.Dial("unixgram", journalSocket); err != nil {
return false
}
return true
}
// Send a message to the local systemd journal. vars is a map of journald
// fields to values. Fields must be composed of uppercase letters, numbers,
// and underscores, but must not start with an underscore. Within these
// restrictions, any arbitrary field name may be used. Some names have special
// significance: see the journalctl documentation
// (http://www.freedesktop.org/software/systemd/man/systemd.journal-fields.html)
// for more details. vars may be nil.
func Send(message string, priority Priority, vars map[string]string) error {
conn := (*net.UnixConn)(atomic.LoadPointer(&unixConnPtr))
if conn == nil {
return errors.New("could not initialize socket to journald")
}
socketAddr := &net.UnixAddr{
Name: journalSocket,
Net: "unixgram",
}
data := new(bytes.Buffer)
appendVariable(data, "PRIORITY", strconv.Itoa(int(priority)))
appendVariable(data, "MESSAGE", message)
for k, v := range vars {
appendVariable(data, k, v)
}
_, _, err := conn.WriteMsgUnix(data.Bytes(), nil, socketAddr)
if err == nil {
return nil
}
if !isSocketSpaceError(err) {
return err
}
// Large log entry, send it via tempfile and ancillary-fd.
file, err := tempFd()
if err != nil {
return err
}
defer file.Close()
_, err = io.Copy(file, data)
if err != nil {
return err
}
rights := syscall.UnixRights(int(file.Fd()))
_, _, err = conn.WriteMsgUnix([]byte{}, rights, socketAddr)
if err != nil {
return err
}
return nil
}
// Print prints a message to the local systemd journal using Send().
func Print(priority Priority, format string, a ...interface{}) error {
return Send(fmt.Sprintf(format, a...), priority, nil)
}
func appendVariable(w io.Writer, name, value string) {
if err := validVarName(name); err != nil {
fmt.Fprintf(os.Stderr, "variable name %s contains invalid character, ignoring\n", name)
}
if strings.ContainsRune(value, '\n') {
/* When the value contains a newline, we write:
* - the variable name, followed by a newline
* - the size (in 64bit little endian format)
* - the data, followed by a newline
*/
fmt.Fprintln(w, name)
binary.Write(w, binary.LittleEndian, uint64(len(value)))
fmt.Fprintln(w, value)
} else {
/* just write the variable and value all on one line */
fmt.Fprintf(w, "%s=%s\n", name, value)
}
}
// validVarName validates a variable name to make sure journald will accept it.
// The variable name must be in uppercase and consist only of characters,
// numbers and underscores, and may not begin with an underscore:
// https://www.freedesktop.org/software/systemd/man/sd_journal_print.html
func validVarName(name string) error {
if name == "" {
return errors.New("Empty variable name")
} else if name[0] == '_' {
return errors.New("Variable name begins with an underscore")
}
for _, c := range name {
if !(('A' <= c && c <= 'Z') || ('0' <= c && c <= '9') || c == '_') {
return errors.New("Variable name contains invalid characters")
}
}
return nil
}
// isSocketSpaceError checks whether the error is signaling
// an "overlarge message" condition.
func isSocketSpaceError(err error) bool {
opErr, ok := err.(*net.OpError)
if !ok || opErr == nil {
return false
}
sysErr, ok := opErr.Err.(*os.SyscallError)
if !ok || sysErr == nil {
return false
}
return sysErr.Err == syscall.EMSGSIZE || sysErr.Err == syscall.ENOBUFS
}
// tempFd creates a temporary, unlinked file under `/dev/shm`.
func tempFd() (*os.File, error) {
file, err := ioutil.TempFile("/dev/shm/", "journal.XXXXX")
if err != nil {
return nil, err
}
err = syscall.Unlink(file.Name())
if err != nil {
return nil, err
}
return file, nil
}
// initConn initializes the global `unixConnPtr` socket.
// It is meant to be called exactly once, at program startup.
func initConn() {
autobind, err := net.ResolveUnixAddr("unixgram", "")
if err != nil {
return
}
sock, err := net.ListenUnixgram("unixgram", autobind)
if err != nil {
return
}
atomic.StorePointer(&unixConnPtr, unsafe.Pointer(sock))
}

View File

@ -0,0 +1,144 @@
package logs
import (
"context"
"encoding/json"
"log"
"net/http"
"net/url"
"strconv"
"time"
"github.com/openfaas/faas-provider/httputil"
)
// Requester submits queries the logging system.
// This will be passed to the log handler constructor.
type Requester interface {
// Query submits a log request to the actual logging system.
Query(context.Context, Request) (<-chan Message, error)
}
// NewLogHandlerFunc creates an http HandlerFunc from the supplied log Requestor.
func NewLogHandlerFunc(requestor Requester, timeout time.Duration) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if r.Body != nil {
defer r.Body.Close()
}
cn, ok := w.(http.CloseNotifier)
if !ok {
log.Println("LogHandler: response is not a CloseNotifier, required for streaming response")
http.NotFound(w, r)
return
}
flusher, ok := w.(http.Flusher)
if !ok {
log.Println("LogHandler: response is not a Flusher, required for streaming response")
http.NotFound(w, r)
return
}
logRequest, err := parseRequest(r)
if err != nil {
log.Printf("LogHandler: could not parse request %s", err)
httputil.Errorf(w, http.StatusUnprocessableEntity, "could not parse the log request")
return
}
ctx, cancelQuery := context.WithTimeout(r.Context(), timeout)
defer cancelQuery()
messages, err := requestor.Query(ctx, logRequest)
if err != nil {
// add smarter error handling here
httputil.Errorf(w, http.StatusInternalServerError, "function log request failed")
return
}
// Send the initial headers saying we're gonna stream the response.
w.Header().Set("Connection", "Keep-Alive")
w.Header().Set("Transfer-Encoding", "chunked")
w.Header().Set(http.CanonicalHeaderKey("Content-Type"), "application/x-ndjson")
w.WriteHeader(http.StatusOK)
flusher.Flush()
// ensure that we always try to send the closing chunk, not the inverted order due to how
// the defer stack works. We need two flush statements to ensure that the empty slice is
// sent as its own chunk
defer flusher.Flush()
defer w.Write([]byte{})
defer flusher.Flush()
jsonEncoder := json.NewEncoder(w)
for messages != nil {
select {
case <-cn.CloseNotify():
log.Println("LogHandler: client stopped listening")
return
case msg, ok := <-messages:
if !ok {
log.Println("LogHandler: end of log stream")
messages = nil
return
}
// serialize and write the msg to the http ResponseWriter
err := jsonEncoder.Encode(msg)
if err != nil {
// can't actually write the status header here so we should json serialize an error
// and return that because we have already sent the content type and status code
log.Printf("LogHandler: failed to serialize log message: '%s'\n", msg.String())
log.Println(err.Error())
// write json error message here ?
jsonEncoder.Encode(Message{Text: "failed to serialize log message"})
flusher.Flush()
return
}
flusher.Flush()
}
}
return
}
}
// parseRequest extracts the logRequest from the GET variables or from the POST body
func parseRequest(r *http.Request) (logRequest Request, err error) {
query := r.URL.Query()
logRequest.Name = getValue(query, "name")
logRequest.Namespace = getValue(query, "namespace")
logRequest.Instance = getValue(query, "instance")
tailStr := getValue(query, "tail")
if tailStr != "" {
logRequest.Tail, err = strconv.Atoi(tailStr)
if err != nil {
return logRequest, err
}
}
// ignore error because it will default to false if we can't parse it
logRequest.Follow, _ = strconv.ParseBool(getValue(query, "follow"))
sinceStr := getValue(query, "since")
if sinceStr != "" {
since, err := time.Parse(time.RFC3339, sinceStr)
logRequest.Since = &since
if err != nil {
return logRequest, err
}
}
return logRequest, nil
}
// getValue returns the value for the given key. If the key has more than one value, it returns the
// last value. if the value does not exist, it returns the empty string.
func getValue(queryValues url.Values, name string) string {
values := queryValues[name]
if len(values) == 0 {
return ""
}
return values[len(values)-1]
}

62
vendor/github.com/openfaas/faas-provider/logs/logs.go generated vendored Normal file
View File

@ -0,0 +1,62 @@
// Package logs provides the standard interface and handler for OpenFaaS providers to expose function logs.
//
// The package defines the Requester interface that OpenFaaS providers should implement and then expose using
// the predefined NewLogHandlerFunc. See the example folder for a minimal log provider implementation.
//
// The Requester is where the actual specific logic for connecting to and querying the log system should be implemented.
//
package logs
import (
"fmt"
"time"
)
// Request is the query to return the function logs.
type Request struct {
// Name is the function name and is required
Name string `json:"name"`
// Namespace is the namespace the function is deployed to, how a namespace is defined
// is faas-provider specific
Namespace string `json:"namespace"`
// Instance is the optional container name, that allows you to request logs from a specific function instance
Instance string `json:"instance"`
// Since is the optional datetime value to start the logs from
Since *time.Time `json:"since"`
// Tail sets the maximum number of log messages to return, <=0 means unlimited
Tail int `json:"tail"`
// Follow is allows the user to request a stream of logs until the timeout
Follow bool `json:"follow"`
}
// String implements that Stringer interface and prints the log Request in a consistent way that
// allows you to safely compare if two requests have the same value.
func (r Request) String() string {
return fmt.Sprintf(
"name:%s namespace: %s instance:%s since:%v tail:%d follow:%v",
r.Name, r.Namespace, r.Instance, r.Since, r.Tail, r.Follow,
)
}
// Message is a specific log message from a function container log stream
type Message struct {
// Name is the function name
Name string `json:"name"`
// Namespace is the namespace the function is deployed to, how a namespace is defined
// is faas-provider specific
Namespace string `json:"namespace"`
// instance is the name/id of the specific function instance
Instance string `json:"instance"`
// Timestamp is the timestamp of when the log message was recorded
Timestamp time.Time `json:"timestamp"`
// Text is the raw log message content
Text string `json:"text"`
}
// String implements the Stringer interface and allows for nice and simple string formatting of a log Message.
func (m Message) String() string {
return fmt.Sprintf(
"%s %s (%s %s) %s",
m.Timestamp.String(), m.Name, m.Namespace, m.Instance, m.Text,
)
}