Compare commits

..

2 Commits
0.1.4 ... 0.1.5

Author SHA1 Message Date
abc41d2108 Update queue-worker to print body
Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alexellis2@gmail.com>
2019-12-23 20:53:11 +00:00
9ba64783f9 Enable NATS, speed-up restart
Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alexellis2@gmail.com>
2019-12-23 20:43:00 +00:00
4 changed files with 83 additions and 30 deletions

1
.gitignore vendored
View File

@ -1,2 +1,3 @@
/faasd
hosts
/resolv.conf

View File

@ -20,18 +20,13 @@ func runUp(_ *cobra.Command, _ []string) error {
wd, _ := os.Getwd()
svcs := []pkg.Service{
// pkg.Service{
// Name: "faas-containerd",
// Env: []string{"snapshotter=overlayfs"},
// Image: "docker.io/alexellis2/faas-containerd:0.3.2",
// Mounts: []pkg.Mount{
// pkg.Mount{
// Src: "/run/containerd/containerd.sock",
// Dest: "/run/containerd/containerd.sock",
// },
// },
// Caps: []string{"CAP_SYS_ADMIN", "CAP_NET_RAW"},
// },
pkg.Service{
Name: "nats",
Env: []string{""},
Image: "docker.io/library/nats-streaming:0.11.2",
Caps: []string{},
Args: []string{"/nats-streaming-server", "-m", "8222", "--store=memory", "--cluster_id=faas-cluster"},
},
pkg.Service{
Name: "prometheus",
Env: []string{},
@ -53,11 +48,28 @@ func runUp(_ *cobra.Command, _ []string) error {
"read_timeout=60s",
"write_timeout=60s",
"upstream_timeout=65s",
"faas_nats_address=nats",
"faas_nats_port=4222",
},
Image: "docker.io/openfaas/gateway:0.17.4",
Mounts: []pkg.Mount{},
Caps: []string{"CAP_NET_RAW"},
},
pkg.Service{
Name: "queue-worker",
Env: []string{
"faas_nats_address=nats",
"faas_nats_port=4222",
"gateway_invoke=true",
"faas_gateway_address=gateway",
"ack_wait=5m5s",
"max_inflight=1",
"faas_print_body=true",
},
Image: "docker.io/openfaas/queue-worker:0.8.4",
Mounts: []pkg.Mount{},
Caps: []string{"CAP_NET_RAW"},
},
}
start := time.Now()

View File

@ -8,7 +8,7 @@ import (
"os"
"os/exec"
"path"
"syscall"
"sync"
"time"
"github.com/alexellis/faasd/pkg/weave"
@ -16,6 +16,7 @@ import (
"github.com/containerd/containerd/cio"
"github.com/containerd/containerd/containers"
"github.com/containerd/containerd/errdefs"
"golang.org/x/sys/unix"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/oci"
@ -60,7 +61,7 @@ func (s *Supervisor) Start(svcs []Service) error {
images := map[string]containerd.Image{}
for _, svc := range svcs {
fmt.Printf("Preparing: %s", svc.Name)
fmt.Printf("Preparing: %s\n", svc.Name)
img, err := prepareImage(ctx, s.client, svc.Image)
if err != nil {
@ -94,20 +95,13 @@ func (s *Supervisor) Start(svcs []Service) error {
status, _ := t.Status(ctx)
fmt.Println("Status:", status.Status)
if status.Status == containerd.Running {
log.Println("need to kill", svc.Name)
err = t.Kill(ctx, syscall.SIGTERM)
if err != nil {
return fmt.Errorf("error killing task %s, %s, %s", container.ID(), svc.Name, err)
}
time.Sleep(5 * time.Second)
}
_, err = t.Delete(ctx)
// if status.Status == containerd.Running {
log.Println("need to kill", svc.Name)
err := killTask(ctx, t)
if err != nil {
return fmt.Errorf("error deleting task %s, %s, %s", container.ID(), svc.Name, err)
return fmt.Errorf("error killing task %s, %s, %s", container.ID(), svc.Name, err)
}
// }
}
err = container.Delete(ctx, containerd.WithSnapshotCleanup)
@ -173,6 +167,7 @@ func (s *Supervisor) Start(svcs []Service) error {
containerd.WithNewSpec(oci.WithImageConfig(image),
oci.WithCapabilities(svc.Caps),
oci.WithMounts(mounts),
withOCIArgs(svc.Args),
hook,
oci.WithEnv(svc.Env)),
)
@ -182,7 +177,7 @@ func (s *Supervisor) Start(svcs []Service) error {
return containerCreateErr
}
fmt.Println("created", newContainer.ID())
log.Printf("Created container %s\n", newContainer.ID())
task, err := newContainer.NewTask(ctx, cio.NewCreator(cio.WithStdio))
if err != nil {
@ -200,7 +195,7 @@ func (s *Supervisor) Start(svcs []Service) error {
writeErr := ioutil.WriteFile("hosts", hosts, 0644)
if writeErr != nil {
fmt.Println("Error writing hosts file")
log.Println("Error writing hosts file")
}
// os.Chown("hosts", 101, 101)
@ -277,9 +272,55 @@ type Service struct {
Name string
Mounts []Mount
Caps []string
Args []string
}
type Mount struct {
Src string
Dest string
}
func withOCIArgs(args []string) oci.SpecOpts {
if len(args) > 0 {
return oci.WithProcessArgs(args...)
}
return func(_ context.Context, _ oci.Client, _ *containers.Container, s *oci.Spec) error {
return nil
}
}
// From Stellar
func killTask(ctx context.Context, task containerd.Task) error {
wg := &sync.WaitGroup{}
wg.Add(1)
var err error
go func() {
defer wg.Done()
if task != nil {
wait, err := task.Wait(ctx)
if err != nil {
err = fmt.Errorf("error waiting on task: %s", err)
return
}
if err := task.Kill(ctx, unix.SIGTERM, containerd.WithKillAll); err != nil {
log.Printf("error killing container task: %s", err)
}
select {
case <-wait:
task.Delete(ctx)
return
case <-time.After(5 * time.Second):
if err := task.Kill(ctx, unix.SIGKILL, containerd.WithKillAll); err != nil {
log.Printf("error force killing container task: %s", err)
}
return
}
}
}()
wg.Wait()
return err
}

View File

@ -1,2 +1 @@
nameserver 8.8.8.8
nameserver 8.8.8.8