476 lines
14 KiB
Diff
476 lines
14 KiB
Diff
From 49e88aa61dd8a99e17edf020faae2307b63858da Mon Sep 17 00:00:00 2001
|
|
From: jingrui <jingrui@huawei.com>
|
|
Date: Sun, 10 Feb 2019 15:40:52 +0800
|
|
Subject: [PATCH] containerd:cleanup container when containerd/dockerd is
|
|
killed
|
|
|
|
when containerd killed during task create, see Runtime.Create(). the
|
|
defer function will not execute, so shim residual. cleanup shim for
|
|
container pid=-1
|
|
|
|
And kill dockerd during docker stop in post-stophook, containerd will load
|
|
task and treat as ok when shim response client. add init.exit to forbid
|
|
load exiting task. also exit event may lost, fix it
|
|
|
|
Signed-off-by: jingrui <jingrui@huawei.com>
|
|
---
|
|
events/events.go | 14 +++
|
|
events/exchange/exchange.go | 12 +++
|
|
events/exit.go | 108 ++++++++++++++++++++
|
|
pkg/process/utils.go | 2 +
|
|
runtime/v1/linux/runtime.go | 63 ++++++++++--
|
|
runtime/v1/linux/task.go | 27 ++++-
|
|
runtime/v1/shim/service.go | 4 +
|
|
vendor/github.com/docker/go-events/queue.go | 18 +++-
|
|
8 files changed, 232 insertions(+), 16 deletions(-)
|
|
create mode 100644 events/exit.go
|
|
|
|
diff --git a/events/events.go b/events/events.go
|
|
index b7eb86f..70ef315 100644
|
|
--- a/events/events.go
|
|
+++ b/events/events.go
|
|
@@ -20,6 +20,7 @@ import (
|
|
"context"
|
|
"time"
|
|
|
|
+ apievents "github.com/containerd/containerd/api/events"
|
|
"github.com/containerd/typeurl"
|
|
"github.com/gogo/protobuf/types"
|
|
)
|
|
@@ -32,6 +33,19 @@ type Envelope struct {
|
|
Event *types.Any
|
|
}
|
|
|
|
+func (e *Envelope) ExitFile() string {
|
|
+ decoded, err := typeurl.UnmarshalAny(e.Event)
|
|
+ if err != nil {
|
|
+ return ""
|
|
+ }
|
|
+
|
|
+ if e, ok := decoded.(*apievents.TaskExit); ok {
|
|
+ return ExitFile(e.ContainerID, e.Pid, e.ExitStatus)
|
|
+ }
|
|
+
|
|
+ return ""
|
|
+}
|
|
+
|
|
// Field returns the value for the given fieldpath as a string, if defined.
|
|
// If the value is not defined, the second value will be false.
|
|
func (e *Envelope) Field(fieldpath []string) (string, bool) {
|
|
diff --git a/events/exchange/exchange.go b/events/exchange/exchange.go
|
|
index a1f385d..162e7be 100644
|
|
--- a/events/exchange/exchange.go
|
|
+++ b/events/exchange/exchange.go
|
|
@@ -49,6 +49,11 @@ func NewExchange() *Exchange {
|
|
var _ events.Publisher = &Exchange{}
|
|
var _ events.Forwarder = &Exchange{}
|
|
var _ events.Subscriber = &Exchange{}
|
|
+var mobySubcribed = false
|
|
+
|
|
+func MobySubscribed() bool {
|
|
+ return mobySubcribed
|
|
+}
|
|
|
|
// Forward accepts an envelope to be directly distributed on the exchange.
|
|
//
|
|
@@ -161,6 +166,13 @@ func (e *Exchange) Subscribe(ctx context.Context, fs ...string) (ch <-chan *even
|
|
}
|
|
|
|
e.broadcaster.Add(dst)
|
|
+ logrus.Infof("subscribe ctx=%v fs=%v", ctx, fs)
|
|
+ for _, s := range fs {
|
|
+ if !MobySubscribed() && s == "namespace==moby,topic~=|^/tasks/|" {
|
|
+ queue.Namespace = "moby"
|
|
+ mobySubcribed = true
|
|
+ }
|
|
+ }
|
|
|
|
go func() {
|
|
defer closeAll()
|
|
diff --git a/events/exit.go b/events/exit.go
|
|
new file mode 100644
|
|
index 0000000..ee9d5a9
|
|
--- /dev/null
|
|
+++ b/events/exit.go
|
|
@@ -0,0 +1,108 @@
|
|
+/*
|
|
+Use of this source code is governed by Apache-2.0
|
|
+license that can be found in the LICENSE file
|
|
+Description: common functions
|
|
+Author: jingrui
|
|
+Create: 2019-02-12
|
|
+*/
|
|
+
|
|
+package events
|
|
+
|
|
+import (
|
|
+ "fmt"
|
|
+ "io/ioutil"
|
|
+ "os"
|
|
+ "path/filepath"
|
|
+ "strconv"
|
|
+ "strings"
|
|
+
|
|
+ "github.com/sirupsen/logrus"
|
|
+)
|
|
+
|
|
+const ExitDir = "/var/run/docker/containerd/exit"
|
|
+const ExitStatusDefault = 137
|
|
+const InitExit = "init.exit"
|
|
+
|
|
+func ExitFile(cid string, pid uint32, status uint32) string {
|
|
+ return fmt.Sprintf("%s.%d.%d", cid, pid, status)
|
|
+}
|
|
+
|
|
+func ExitInfo(ef string) (string, uint32, uint32) {
|
|
+ s := strings.Split(ef, ".")
|
|
+ if len(s) != 3 {
|
|
+ return "", 0, 0
|
|
+ }
|
|
+
|
|
+ cid := s[0]
|
|
+ pid, err := strconv.ParseUint(s[1], 10, 32)
|
|
+ if err != nil {
|
|
+ return "", 0, 0
|
|
+ }
|
|
+ status, err := strconv.ParseUint(s[2], 10, 32)
|
|
+ if err != nil {
|
|
+ return "", 0, 0
|
|
+ }
|
|
+
|
|
+ return cid, uint32(pid), uint32(status)
|
|
+}
|
|
+
|
|
+func ExitAddFile(ns string, ef string, reason string) {
|
|
+ logrus.Devour(os.MkdirAll(filepath.Join(ExitDir, ns), 0700))
|
|
+ err := ioutil.WriteFile(filepath.Join(ExitDir, ns, ef), []byte{}, 0600)
|
|
+ logrus.Infof("exit-add %s/%s [reason: %s] error=%v", ns, ef, reason, err)
|
|
+}
|
|
+
|
|
+func ExitDelFile(ns string, ef string) {
|
|
+ err := os.RemoveAll(filepath.Join(ExitDir, ns, ef))
|
|
+ logrus.Devour(err)
|
|
+ logrus.Infof("exit-del %s/%s error=%v", ns, ef, err)
|
|
+}
|
|
+
|
|
+func ExitGetFile(ns string, cid string, pid uint32, status uint32) string {
|
|
+ ef := ExitFile(cid, pid, status)
|
|
+ if _, err := os.Stat(filepath.Join(ExitDir, ns, ef)); err == nil {
|
|
+ return ef
|
|
+ }
|
|
+ return ""
|
|
+}
|
|
+
|
|
+func ExitGetFiles(ns string) []string {
|
|
+ files, err := ioutil.ReadDir(filepath.Join(ExitDir, ns))
|
|
+ if err != nil {
|
|
+ return []string{}
|
|
+ }
|
|
+
|
|
+ names := []string{}
|
|
+ for _, f := range files {
|
|
+ names = append(names, f.Name())
|
|
+ }
|
|
+
|
|
+ return names
|
|
+}
|
|
+
|
|
+func ExitPending(ns string, cid string, pid uint32) bool {
|
|
+ for _, ef := range ExitGetFiles(ns) {
|
|
+ if strings.Contains(ef, fmt.Sprintf("%s.%d", cid, pid)) {
|
|
+ return true
|
|
+ }
|
|
+ }
|
|
+ return false
|
|
+}
|
|
+
|
|
+func InitExitWrite(bundle string, pid int) {
|
|
+ if _, err := os.Stat(bundle); err != nil {
|
|
+ logrus.Infof("skip write init.exit %s error=%v", bundle, err)
|
|
+ return
|
|
+ }
|
|
+ err := ioutil.WriteFile(filepath.Join(bundle, InitExit), []byte(fmt.Sprintf("%d", pid)), 0600)
|
|
+ if err != nil {
|
|
+ logrus.Infof("failed write init.exit error=%s", bundle, err)
|
|
+ }
|
|
+}
|
|
+
|
|
+func InitExitExist(bundle string) bool {
|
|
+ if _, err := os.Stat(filepath.Join(bundle, InitExit)); err == nil {
|
|
+ return true
|
|
+ }
|
|
+ return false
|
|
+}
|
|
diff --git a/pkg/process/utils.go b/pkg/process/utils.go
|
|
index afada02..5ff04ed 100644
|
|
--- a/pkg/process/utils.go
|
|
+++ b/pkg/process/utils.go
|
|
@@ -41,6 +41,8 @@ const (
|
|
RuncRoot = "/run/containerd/runc"
|
|
// InitPidFile name of the file that contains the init pid
|
|
InitPidFile = "init.pid"
|
|
+
|
|
+ InitExit = "init.exit"
|
|
)
|
|
|
|
// safePid is a thread safe wrapper for pid.
|
|
diff --git a/runtime/v1/linux/runtime.go b/runtime/v1/linux/runtime.go
|
|
index b6d5382..a6efd81 100644
|
|
--- a/runtime/v1/linux/runtime.go
|
|
+++ b/runtime/v1/linux/runtime.go
|
|
@@ -32,6 +32,7 @@ import (
|
|
"github.com/containerd/containerd/api/types"
|
|
"github.com/containerd/containerd/containers"
|
|
"github.com/containerd/containerd/errdefs"
|
|
+ "github.com/containerd/containerd/events"
|
|
"github.com/containerd/containerd/events/exchange"
|
|
"github.com/containerd/containerd/identifiers"
|
|
"github.com/containerd/containerd/log"
|
|
@@ -138,6 +139,7 @@ func New(ic *plugin.InitContext) (interface{}, error) {
|
|
return nil, err
|
|
}
|
|
}
|
|
+ go r.resendExitEvents(ic.Context, "moby")
|
|
return r, nil
|
|
}
|
|
|
|
@@ -184,7 +186,8 @@ func (r *Runtime) Create(ctx context.Context, id string, opts runtime.CreateOpts
|
|
}
|
|
defer func() {
|
|
if err != nil {
|
|
- bundle.Delete()
|
|
+ errd := bundle.Delete()
|
|
+ log.G(ctx).WithError(err).Errorf("revert: delete bundle error=%v", errd)
|
|
}
|
|
}()
|
|
|
|
@@ -225,9 +228,8 @@ func (r *Runtime) Create(ctx context.Context, id string, opts runtime.CreateOpts
|
|
deferCtx, deferCancel := context.WithTimeout(
|
|
namespaces.WithNamespace(context.TODO(), namespace), cleanupTimeout)
|
|
defer deferCancel()
|
|
- if kerr := s.KillShim(deferCtx); kerr != nil {
|
|
- log.G(ctx).WithError(kerr).Error("failed to kill shim")
|
|
- }
|
|
+ kerr := s.KillShim(deferCtx)
|
|
+ log.G(ctx).WithError(err).Errorf("revert: kill shim error=%v", kerr)
|
|
}
|
|
}()
|
|
|
|
@@ -338,6 +340,41 @@ func (r *Runtime) Delete(ctx context.Context, id string) (*runtime.Exit, error)
|
|
return exit, nil
|
|
}
|
|
|
|
+func (r *Runtime) resendExitEvents(ctx context.Context, ns string) {
|
|
+ for {
|
|
+ time.Sleep(time.Second)
|
|
+ efs := events.ExitGetFiles(ns)
|
|
+ if len(efs) == 0 {
|
|
+ break
|
|
+ }
|
|
+
|
|
+ if !exchange.MobySubscribed() {
|
|
+ logrus.Infof("waiting moby event stream ...")
|
|
+ continue
|
|
+ }
|
|
+ time.Sleep(time.Second)
|
|
+
|
|
+ for _, ef := range efs {
|
|
+ cid, pid, status := events.ExitInfo(ef)
|
|
+ if cid == "" {
|
|
+ continue
|
|
+ }
|
|
+
|
|
+ e := &eventstypes.TaskExit{
|
|
+ ContainerID: cid,
|
|
+ ID: cid,
|
|
+ ExitStatus: status,
|
|
+ ExitedAt: time.Now().UTC(),
|
|
+ Pid: uint32(pid),
|
|
+ }
|
|
+
|
|
+ ctx := namespaces.WithNamespace(context.Background(), ns)
|
|
+ err := r.events.Publish(ctx, runtime.TaskExitEventTopic, e)
|
|
+ logrus.Infof("resend exit event %v error=%v", e, err)
|
|
+ }
|
|
+ }
|
|
+}
|
|
+
|
|
func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) {
|
|
dir, err := os.ReadDir(filepath.Join(r.state, ns))
|
|
if err != nil {
|
|
@@ -349,6 +386,7 @@ func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) {
|
|
continue
|
|
}
|
|
id := path.Name()
|
|
+ log.G(ctx).Infof("load-task %s", id)
|
|
// skip hidden directories
|
|
if len(id) > 0 && id[0] == '.' {
|
|
continue
|
|
@@ -435,6 +473,20 @@ func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) {
|
|
log.G(ctx).WithError(err).Error("loading task type")
|
|
continue
|
|
}
|
|
+ if pid <= 0 {
|
|
+ _, err := t.DeleteForce(ctx, 0)
|
|
+ log.G(ctx).Warnf("delete force %s Pid=%d error=%v", id, pid, err)
|
|
+ continue
|
|
+ }
|
|
+ if _, err := os.Stat(filepath.Join(bundle.path, process.InitExit)); err == nil {
|
|
+ if !events.ExitPending(ns, t.id, uint32(pid)) {
|
|
+ events.ExitAddFile(ns, events.ExitFile(t.id, uint32(pid), uint32(events.ExitStatusDefault)), "cleanup dirty task")
|
|
+ }
|
|
+ _, err := t.DeleteForce(ctx, uint32(pid))
|
|
+ log.G(ctx).Warnf("delete force %s Pid=%d(exiting) error=%v", id, pid, err)
|
|
+ continue
|
|
+ }
|
|
+ log.G(ctx).Infof("load-task %s Pid=%d done", id, pid)
|
|
o = append(o, t)
|
|
}
|
|
return o, nil
|
|
@@ -449,9 +501,6 @@ func (r *Runtime) cleanupAfterDeadShim(ctx context.Context, bundle *bundle, ns,
|
|
pid, _ := runc.ReadPidFile(filepath.Join(bundle.path, process.InitPidFile))
|
|
ctx = namespaces.WithNamespace(ctx, ns)
|
|
if err := r.terminate(ctx, bundle, ns, id); err != nil {
|
|
- if r.config.ShimDebug {
|
|
- return fmt.Errorf("failed to terminate task, leaving bundle for debugging: %w", err)
|
|
- }
|
|
log.G(ctx).WithError(err).Warn("failed to terminate task")
|
|
}
|
|
|
|
diff --git a/runtime/v1/linux/task.go b/runtime/v1/linux/task.go
|
|
index 5a8dab1..70908ae 100644
|
|
--- a/runtime/v1/linux/task.go
|
|
+++ b/runtime/v1/linux/task.go
|
|
@@ -24,6 +24,7 @@ import (
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
+ "time"
|
|
|
|
"github.com/containerd/cgroups"
|
|
eventstypes "github.com/containerd/containerd/api/events"
|
|
@@ -39,6 +40,7 @@ import (
|
|
"github.com/containerd/typeurl"
|
|
"github.com/gogo/protobuf/types"
|
|
"github.com/sirupsen/logrus"
|
|
+ "golang.org/x/sys/unix"
|
|
)
|
|
|
|
// Task on a linux based system
|
|
@@ -93,12 +95,12 @@ func (t *Task) PID(_ context.Context) (uint32, error) {
|
|
}
|
|
|
|
// Delete the task and return the exit status
|
|
-func (t *Task) Delete(ctx context.Context) (*runtime.Exit, error) {
|
|
+func (t *Task) delete(ctx context.Context, force bool, pid uint32) (*runtime.Exit, error) {
|
|
rsp, shimErr := t.shim.Delete(ctx, empty)
|
|
if shimErr != nil {
|
|
- shimErr = errdefs.FromGRPC(shimErr)
|
|
- if !errdefs.IsNotFound(shimErr) {
|
|
- return nil, shimErr
|
|
+ log.G(ctx).WithError(shimErr).Error("failed to delete container, force=%t", force)
|
|
+ if !force {
|
|
+ return nil, errdefs.FromGRPC(shimErr)
|
|
}
|
|
}
|
|
t.tasks.Delete(ctx, t.id)
|
|
@@ -108,6 +110,14 @@ func (t *Task) Delete(ctx context.Context) (*runtime.Exit, error) {
|
|
if err := t.bundle.Delete(); err != nil {
|
|
log.G(ctx).WithError(err).Error("failed to delete bundle")
|
|
}
|
|
+
|
|
+ if rsp == nil {
|
|
+ rsp = &shim.DeleteResponse{}
|
|
+ rsp.ExitStatus = 128 + uint32(unix.SIGKILL)
|
|
+ rsp.ExitedAt = time.Now().UTC()
|
|
+ rsp.Pid = pid
|
|
+ }
|
|
+
|
|
if shimErr != nil {
|
|
return nil, shimErr
|
|
}
|
|
@@ -124,6 +134,15 @@ func (t *Task) Delete(ctx context.Context) (*runtime.Exit, error) {
|
|
}, nil
|
|
}
|
|
|
|
+// Delete the task and return the exit status
|
|
+func (t *Task) Delete(ctx context.Context) (*runtime.Exit, error) {
|
|
+ return t.delete(ctx, false, 0)
|
|
+}
|
|
+
|
|
+func (t *Task) DeleteForce(ctx context.Context, pid uint32) (*runtime.Exit, error) {
|
|
+ return t.delete(ctx, true, pid)
|
|
+}
|
|
+
|
|
// Start the task
|
|
func (t *Task) Start(ctx context.Context) error {
|
|
t.mu.Lock()
|
|
diff --git a/runtime/v1/shim/service.go b/runtime/v1/shim/service.go
|
|
index a08757d..b00ed9c 100644
|
|
--- a/runtime/v1/shim/service.go
|
|
+++ b/runtime/v1/shim/service.go
|
|
@@ -23,6 +23,7 @@ import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
+ "io/ioutil"
|
|
"os"
|
|
"path/filepath"
|
|
"sync"
|
|
@@ -520,6 +521,9 @@ func (s *Service) checkProcesses(e runc.Exit) {
|
|
return
|
|
}
|
|
if ip, ok := p.(*process.Init); ok {
|
|
+ ns := filepath.Base(filepath.Dir(ip.Bundle))
|
|
+ events.ExitAddFile(ns, events.ExitFile(s.id, uint32(e.Pid), uint32(e.Status)), "init exited")
|
|
+ ioutil.WriteFile(filepath.Join(ip.Bundle, process.InitExit), []byte(fmt.Sprintf("%d", e.Pid)), 0600)
|
|
// Ensure all children are killed
|
|
if shouldKillAllOnExit(s.context, s.bundle) {
|
|
if err := ip.KillAll(s.context); err != nil {
|
|
diff --git a/vendor/github.com/docker/go-events/queue.go b/vendor/github.com/docker/go-events/queue.go
|
|
index 4bb770a..5e83b40 100644
|
|
--- a/vendor/github.com/docker/go-events/queue.go
|
|
+++ b/vendor/github.com/docker/go-events/queue.go
|
|
@@ -4,6 +4,7 @@ import (
|
|
"container/list"
|
|
"sync"
|
|
|
|
+ topevents "github.com/containerd/containerd/events"
|
|
"github.com/sirupsen/logrus"
|
|
)
|
|
|
|
@@ -11,11 +12,12 @@ import (
|
|
// by a sink. It is unbounded and thread safe but the sink must be reliable or
|
|
// events will be dropped.
|
|
type Queue struct {
|
|
- dst Sink
|
|
- events *list.List
|
|
- cond *sync.Cond
|
|
- mu sync.Mutex
|
|
- closed bool
|
|
+ Namespace string
|
|
+ dst Sink
|
|
+ events *list.List
|
|
+ cond *sync.Cond
|
|
+ mu sync.Mutex
|
|
+ closed bool
|
|
}
|
|
|
|
// NewQueue returns a queue to the provided Sink dst.
|
|
@@ -83,6 +85,12 @@ func (eq *Queue) run() {
|
|
"event": event,
|
|
"sink": eq.dst,
|
|
}).WithError(err).Debug("eventqueue: dropped event")
|
|
+ } else {
|
|
+ if e, ok := event.(*topevents.Envelope); ok {
|
|
+ if ef := e.ExitFile(); ef != "" {
|
|
+ topevents.ExitDelFile(eq.Namespace, ef)
|
|
+ }
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
--
|
|
2.33.0
|
|
|