diff --git a/pkg/service/service.go b/pkg/service/service.go new file mode 100644 index 0000000..3b81a7d --- /dev/null +++ b/pkg/service/service.go @@ -0,0 +1,81 @@ +package service + +import ( + "context" + "fmt" + "log" + "sync" + "time" + + "github.com/containerd/containerd" + "github.com/containerd/containerd/errdefs" + "golang.org/x/sys/unix" +) + +// Remove removes a container +func Remove(ctx context.Context, client *containerd.Client, name string) error { + + container, containerErr := client.LoadContainer(ctx, name) + + if containerErr == nil { + found := true + t, err := container.Task(ctx, nil) + if err != nil { + if errdefs.IsNotFound(err) { + found = false + } else { + return fmt.Errorf("unable to get task %s: ", err) + } + } + + if found { + status, _ := t.Status(ctx) + fmt.Printf("Status of %s is: %s\n", name, status.Status) + + log.Printf("Need to kill %s\n", name) + err := killTask(ctx, t) + if err != nil { + return fmt.Errorf("error killing task %s, %s, %s", container.ID(), name, err) + } + } + + err = container.Delete(ctx, containerd.WithSnapshotCleanup) + if err != nil { + return fmt.Errorf("error deleting container %s, %s, %s", container.ID(), name, err) + } + } + return nil +} + +// From Stellar +func killTask(ctx context.Context, task containerd.Task) error { + wg := &sync.WaitGroup{} + wg.Add(1) + var err error + go func() { + defer wg.Done() + if task != nil { + wait, err := task.Wait(ctx) + if err != nil { + err = fmt.Errorf("error waiting on task: %s", err) + return + } + if err := task.Kill(ctx, unix.SIGTERM, containerd.WithKillAll); err != nil { + log.Printf("error killing container task: %s", err) + } + select { + case <-wait: + task.Delete(ctx) + return + case <-time.After(5 * time.Second): + if err := task.Kill(ctx, unix.SIGKILL, containerd.WithKillAll); err != nil { + log.Printf("error force killing container task: %s", err) + } + return + } + } + }() + wg.Wait() + + return err +} diff --git a/pkg/supervisor.go b/pkg/supervisor.go index a1cd59d..9393c30 100644 --- a/pkg/supervisor.go +++ b/pkg/supervisor.go @@ -8,15 +8,13 @@ import ( "os" "os/exec" "path" - "sync" - "time" + "github.com/alexellis/faasd/pkg/service" "github.com/alexellis/faasd/pkg/weave" "github.com/containerd/containerd" "github.com/containerd/containerd/cio" "github.com/containerd/containerd/containers" "github.com/containerd/containerd/errdefs" - "golang.org/x/sys/unix" "github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/oci" @@ -48,7 +46,7 @@ func (s *Supervisor) Remove(svcs []Service) error { ctx := namespaces.WithNamespace(context.Background(), "default") for _, svc := range svcs { - err := removeContainer(ctx, s.client, svc.Name) + err := service.Remove(ctx, s.client, svc.Name) if err != nil { return err } @@ -87,13 +85,13 @@ func (s *Supervisor) Start(svcs []Service) error { for _, svc := range svcs { fmt.Printf("Reconciling: %s\n", svc.Name) - image := images[svc.Name] - - containerErr := removeContainer(ctx, s.client, svc.Name) + containerErr := service.Remove(ctx, s.client, svc.Name) if containerErr != nil { return containerErr } + image := images[svc.Name] + mounts := []specs.Mount{} if len(svc.Mounts) > 0 { for _, mnt := range svc.Mounts { @@ -274,70 +272,3 @@ func withOCIArgs(args []string) oci.SpecOpts { } } - -// From Stellar -func killTask(ctx context.Context, task containerd.Task) error { - wg := &sync.WaitGroup{} - wg.Add(1) - var err error - go func() { - defer wg.Done() - if task != nil { - wait, err := task.Wait(ctx) - if err != nil { - err = fmt.Errorf("error waiting on task: %s", err) - return - } - if err := task.Kill(ctx, unix.SIGTERM, containerd.WithKillAll); err != nil { - log.Printf("error killing container task: %s", err) - } - select { - case <-wait: - task.Delete(ctx) - return - case <-time.After(5 * time.Second): - if err := task.Kill(ctx, unix.SIGKILL, containerd.WithKillAll); err != nil { - log.Printf("error force killing container task: %s", err) - } - return - } - } - }() - wg.Wait() - - return err -} - -func removeContainer(ctx context.Context, client *containerd.Client, name string) error { - - container, containerErr := client.LoadContainer(ctx, name) - - if containerErr == nil { - found := true - t, err := container.Task(ctx, nil) - if err != nil { - if errdefs.IsNotFound(err) { - found = false - } else { - return fmt.Errorf("unable to get task %s: ", err) - } - } - - if found { - status, _ := t.Status(ctx) - fmt.Printf("Status of %s is: %s\n", name, status.Status) - - log.Printf("Need to kill %s\n", name) - err := killTask(ctx, t) - if err != nil { - return fmt.Errorf("error killing task %s, %s, %s", container.ID(), name, err) - } - } - - err = container.Delete(ctx, containerd.WithSnapshotCleanup) - if err != nil { - return fmt.Errorf("error deleting container %s, %s, %s", container.ID(), name, err) - } - } - return nil -}