Extract Service struct

Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alexellis2@gmail.com>
This commit is contained in:
Alex Ellis (OpenFaaS Ltd) 2019-12-28 18:32:13 +00:00
parent f3f6225674
commit 13d28bd2db
2 changed files with 86 additions and 74 deletions

81
pkg/service/service.go Normal file
View File

@ -0,0 +1,81 @@
package service
import (
"context"
"fmt"
"log"
"sync"
"time"
"github.com/containerd/containerd"
"github.com/containerd/containerd/errdefs"
"golang.org/x/sys/unix"
)
// Remove removes a container
func Remove(ctx context.Context, client *containerd.Client, name string) error {
container, containerErr := client.LoadContainer(ctx, name)
if containerErr == nil {
found := true
t, err := container.Task(ctx, nil)
if err != nil {
if errdefs.IsNotFound(err) {
found = false
} else {
return fmt.Errorf("unable to get task %s: ", err)
}
}
if found {
status, _ := t.Status(ctx)
fmt.Printf("Status of %s is: %s\n", name, status.Status)
log.Printf("Need to kill %s\n", name)
err := killTask(ctx, t)
if err != nil {
return fmt.Errorf("error killing task %s, %s, %s", container.ID(), name, err)
}
}
err = container.Delete(ctx, containerd.WithSnapshotCleanup)
if err != nil {
return fmt.Errorf("error deleting container %s, %s, %s", container.ID(), name, err)
}
}
return nil
}
// From Stellar
func killTask(ctx context.Context, task containerd.Task) error {
wg := &sync.WaitGroup{}
wg.Add(1)
var err error
go func() {
defer wg.Done()
if task != nil {
wait, err := task.Wait(ctx)
if err != nil {
err = fmt.Errorf("error waiting on task: %s", err)
return
}
if err := task.Kill(ctx, unix.SIGTERM, containerd.WithKillAll); err != nil {
log.Printf("error killing container task: %s", err)
}
select {
case <-wait:
task.Delete(ctx)
return
case <-time.After(5 * time.Second):
if err := task.Kill(ctx, unix.SIGKILL, containerd.WithKillAll); err != nil {
log.Printf("error force killing container task: %s", err)
}
return
}
}
}()
wg.Wait()
return err
}

View File

@ -8,15 +8,13 @@ import (
"os"
"os/exec"
"path"
"sync"
"time"
"github.com/alexellis/faasd/pkg/service"
"github.com/alexellis/faasd/pkg/weave"
"github.com/containerd/containerd"
"github.com/containerd/containerd/cio"
"github.com/containerd/containerd/containers"
"github.com/containerd/containerd/errdefs"
"golang.org/x/sys/unix"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/oci"
@ -48,7 +46,7 @@ func (s *Supervisor) Remove(svcs []Service) error {
ctx := namespaces.WithNamespace(context.Background(), "default")
for _, svc := range svcs {
err := removeContainer(ctx, s.client, svc.Name)
err := service.Remove(ctx, s.client, svc.Name)
if err != nil {
return err
}
@ -87,13 +85,13 @@ func (s *Supervisor) Start(svcs []Service) error {
for _, svc := range svcs {
fmt.Printf("Reconciling: %s\n", svc.Name)
image := images[svc.Name]
containerErr := removeContainer(ctx, s.client, svc.Name)
containerErr := service.Remove(ctx, s.client, svc.Name)
if containerErr != nil {
return containerErr
}
image := images[svc.Name]
mounts := []specs.Mount{}
if len(svc.Mounts) > 0 {
for _, mnt := range svc.Mounts {
@ -274,70 +272,3 @@ func withOCIArgs(args []string) oci.SpecOpts {
}
}
// From Stellar
func killTask(ctx context.Context, task containerd.Task) error {
wg := &sync.WaitGroup{}
wg.Add(1)
var err error
go func() {
defer wg.Done()
if task != nil {
wait, err := task.Wait(ctx)
if err != nil {
err = fmt.Errorf("error waiting on task: %s", err)
return
}
if err := task.Kill(ctx, unix.SIGTERM, containerd.WithKillAll); err != nil {
log.Printf("error killing container task: %s", err)
}
select {
case <-wait:
task.Delete(ctx)
return
case <-time.After(5 * time.Second):
if err := task.Kill(ctx, unix.SIGKILL, containerd.WithKillAll); err != nil {
log.Printf("error force killing container task: %s", err)
}
return
}
}
}()
wg.Wait()
return err
}
func removeContainer(ctx context.Context, client *containerd.Client, name string) error {
container, containerErr := client.LoadContainer(ctx, name)
if containerErr == nil {
found := true
t, err := container.Task(ctx, nil)
if err != nil {
if errdefs.IsNotFound(err) {
found = false
} else {
return fmt.Errorf("unable to get task %s: ", err)
}
}
if found {
status, _ := t.Status(ctx)
fmt.Printf("Status of %s is: %s\n", name, status.Status)
log.Printf("Need to kill %s\n", name)
err := killTask(ctx, t)
if err != nil {
return fmt.Errorf("error killing task %s, %s, %s", container.ID(), name, err)
}
}
err = container.Delete(ctx, containerd.WithSnapshotCleanup)
if err != nil {
return fmt.Errorf("error deleting container %s, %s, %s", container.ID(), name, err)
}
}
return nil
}