From 49e88aa61dd8a99e17edf020faae2307b63858da Mon Sep 17 00:00:00 2001 From: jingrui Date: Sun, 10 Feb 2019 15:40:52 +0800 Subject: [PATCH] containerd:cleanup container when containerd/dockerd is killed when containerd killed during task create, see Runtime.Create(). the defer function will not execute, so shim residual. cleanup shim for container pid=-1 And kill dockerd during docker stop in post-stophook, containerd will load task and treat as ok when shim response client. add init.exit to forbid load exiting task. also exit event may lost, fix it Signed-off-by: jingrui --- events/events.go | 14 +++ events/exchange/exchange.go | 12 +++ events/exit.go | 108 ++++++++++++++++++++ pkg/process/utils.go | 2 + runtime/v1/linux/runtime.go | 63 ++++++++++-- runtime/v1/linux/task.go | 27 ++++- runtime/v1/shim/service.go | 4 + vendor/github.com/docker/go-events/queue.go | 18 +++- 8 files changed, 232 insertions(+), 16 deletions(-) create mode 100644 events/exit.go diff --git a/events/events.go b/events/events.go index b7eb86f..70ef315 100644 --- a/events/events.go +++ b/events/events.go @@ -20,6 +20,7 @@ import ( "context" "time" + apievents "github.com/containerd/containerd/api/events" "github.com/containerd/typeurl" "github.com/gogo/protobuf/types" ) @@ -32,6 +33,19 @@ type Envelope struct { Event *types.Any } +func (e *Envelope) ExitFile() string { + decoded, err := typeurl.UnmarshalAny(e.Event) + if err != nil { + return "" + } + + if e, ok := decoded.(*apievents.TaskExit); ok { + return ExitFile(e.ContainerID, e.Pid, e.ExitStatus) + } + + return "" +} + // Field returns the value for the given fieldpath as a string, if defined. // If the value is not defined, the second value will be false. func (e *Envelope) Field(fieldpath []string) (string, bool) { diff --git a/events/exchange/exchange.go b/events/exchange/exchange.go index a1f385d..162e7be 100644 --- a/events/exchange/exchange.go +++ b/events/exchange/exchange.go @@ -49,6 +49,11 @@ func NewExchange() *Exchange { var _ events.Publisher = &Exchange{} var _ events.Forwarder = &Exchange{} var _ events.Subscriber = &Exchange{} +var mobySubcribed = false + +func MobySubscribed() bool { + return mobySubcribed +} // Forward accepts an envelope to be directly distributed on the exchange. // @@ -161,6 +166,13 @@ func (e *Exchange) Subscribe(ctx context.Context, fs ...string) (ch <-chan *even } e.broadcaster.Add(dst) + logrus.Infof("subscribe ctx=%v fs=%v", ctx, fs) + for _, s := range fs { + if !MobySubscribed() && s == "namespace==moby,topic~=|^/tasks/|" { + queue.Namespace = "moby" + mobySubcribed = true + } + } go func() { defer closeAll() diff --git a/events/exit.go b/events/exit.go new file mode 100644 index 0000000..ee9d5a9 --- /dev/null +++ b/events/exit.go @@ -0,0 +1,108 @@ +/* +Use of this source code is governed by Apache-2.0 +license that can be found in the LICENSE file +Description: common functions +Author: jingrui +Create: 2019-02-12 +*/ + +package events + +import ( + "fmt" + "io/ioutil" + "os" + "path/filepath" + "strconv" + "strings" + + "github.com/sirupsen/logrus" +) + +const ExitDir = "/var/run/docker/containerd/exit" +const ExitStatusDefault = 137 +const InitExit = "init.exit" + +func ExitFile(cid string, pid uint32, status uint32) string { + return fmt.Sprintf("%s.%d.%d", cid, pid, status) +} + +func ExitInfo(ef string) (string, uint32, uint32) { + s := strings.Split(ef, ".") + if len(s) != 3 { + return "", 0, 0 + } + + cid := s[0] + pid, err := strconv.ParseUint(s[1], 10, 32) + if err != nil { + return "", 0, 0 + } + status, err := strconv.ParseUint(s[2], 10, 32) + if err != nil { + return "", 0, 0 + } + + return cid, uint32(pid), uint32(status) +} + +func ExitAddFile(ns string, ef string, reason string) { + logrus.Devour(os.MkdirAll(filepath.Join(ExitDir, ns), 0700)) + err := ioutil.WriteFile(filepath.Join(ExitDir, ns, ef), []byte{}, 0600) + logrus.Infof("exit-add %s/%s [reason: %s] error=%v", ns, ef, reason, err) +} + +func ExitDelFile(ns string, ef string) { + err := os.RemoveAll(filepath.Join(ExitDir, ns, ef)) + logrus.Devour(err) + logrus.Infof("exit-del %s/%s error=%v", ns, ef, err) +} + +func ExitGetFile(ns string, cid string, pid uint32, status uint32) string { + ef := ExitFile(cid, pid, status) + if _, err := os.Stat(filepath.Join(ExitDir, ns, ef)); err == nil { + return ef + } + return "" +} + +func ExitGetFiles(ns string) []string { + files, err := ioutil.ReadDir(filepath.Join(ExitDir, ns)) + if err != nil { + return []string{} + } + + names := []string{} + for _, f := range files { + names = append(names, f.Name()) + } + + return names +} + +func ExitPending(ns string, cid string, pid uint32) bool { + for _, ef := range ExitGetFiles(ns) { + if strings.Contains(ef, fmt.Sprintf("%s.%d", cid, pid)) { + return true + } + } + return false +} + +func InitExitWrite(bundle string, pid int) { + if _, err := os.Stat(bundle); err != nil { + logrus.Infof("skip write init.exit %s error=%v", bundle, err) + return + } + err := ioutil.WriteFile(filepath.Join(bundle, InitExit), []byte(fmt.Sprintf("%d", pid)), 0600) + if err != nil { + logrus.Infof("failed write init.exit error=%s", bundle, err) + } +} + +func InitExitExist(bundle string) bool { + if _, err := os.Stat(filepath.Join(bundle, InitExit)); err == nil { + return true + } + return false +} diff --git a/pkg/process/utils.go b/pkg/process/utils.go index afada02..5ff04ed 100644 --- a/pkg/process/utils.go +++ b/pkg/process/utils.go @@ -41,6 +41,8 @@ const ( RuncRoot = "/run/containerd/runc" // InitPidFile name of the file that contains the init pid InitPidFile = "init.pid" + + InitExit = "init.exit" ) // safePid is a thread safe wrapper for pid. diff --git a/runtime/v1/linux/runtime.go b/runtime/v1/linux/runtime.go index b6d5382..a6efd81 100644 --- a/runtime/v1/linux/runtime.go +++ b/runtime/v1/linux/runtime.go @@ -32,6 +32,7 @@ import ( "github.com/containerd/containerd/api/types" "github.com/containerd/containerd/containers" "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/events" "github.com/containerd/containerd/events/exchange" "github.com/containerd/containerd/identifiers" "github.com/containerd/containerd/log" @@ -138,6 +139,7 @@ func New(ic *plugin.InitContext) (interface{}, error) { return nil, err } } + go r.resendExitEvents(ic.Context, "moby") return r, nil } @@ -184,7 +186,8 @@ func (r *Runtime) Create(ctx context.Context, id string, opts runtime.CreateOpts } defer func() { if err != nil { - bundle.Delete() + errd := bundle.Delete() + log.G(ctx).WithError(err).Errorf("revert: delete bundle error=%v", errd) } }() @@ -225,9 +228,8 @@ func (r *Runtime) Create(ctx context.Context, id string, opts runtime.CreateOpts deferCtx, deferCancel := context.WithTimeout( namespaces.WithNamespace(context.TODO(), namespace), cleanupTimeout) defer deferCancel() - if kerr := s.KillShim(deferCtx); kerr != nil { - log.G(ctx).WithError(kerr).Error("failed to kill shim") - } + kerr := s.KillShim(deferCtx) + log.G(ctx).WithError(err).Errorf("revert: kill shim error=%v", kerr) } }() @@ -338,6 +340,41 @@ func (r *Runtime) Delete(ctx context.Context, id string) (*runtime.Exit, error) return exit, nil } +func (r *Runtime) resendExitEvents(ctx context.Context, ns string) { + for { + time.Sleep(time.Second) + efs := events.ExitGetFiles(ns) + if len(efs) == 0 { + break + } + + if !exchange.MobySubscribed() { + logrus.Infof("waiting moby event stream ...") + continue + } + time.Sleep(time.Second) + + for _, ef := range efs { + cid, pid, status := events.ExitInfo(ef) + if cid == "" { + continue + } + + e := &eventstypes.TaskExit{ + ContainerID: cid, + ID: cid, + ExitStatus: status, + ExitedAt: time.Now().UTC(), + Pid: uint32(pid), + } + + ctx := namespaces.WithNamespace(context.Background(), ns) + err := r.events.Publish(ctx, runtime.TaskExitEventTopic, e) + logrus.Infof("resend exit event %v error=%v", e, err) + } + } +} + func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) { dir, err := os.ReadDir(filepath.Join(r.state, ns)) if err != nil { @@ -349,6 +386,7 @@ func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) { continue } id := path.Name() + log.G(ctx).Infof("load-task %s", id) // skip hidden directories if len(id) > 0 && id[0] == '.' { continue @@ -435,6 +473,20 @@ func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) { log.G(ctx).WithError(err).Error("loading task type") continue } + if pid <= 0 { + _, err := t.DeleteForce(ctx, 0) + log.G(ctx).Warnf("delete force %s Pid=%d error=%v", id, pid, err) + continue + } + if _, err := os.Stat(filepath.Join(bundle.path, process.InitExit)); err == nil { + if !events.ExitPending(ns, t.id, uint32(pid)) { + events.ExitAddFile(ns, events.ExitFile(t.id, uint32(pid), uint32(events.ExitStatusDefault)), "cleanup dirty task") + } + _, err := t.DeleteForce(ctx, uint32(pid)) + log.G(ctx).Warnf("delete force %s Pid=%d(exiting) error=%v", id, pid, err) + continue + } + log.G(ctx).Infof("load-task %s Pid=%d done", id, pid) o = append(o, t) } return o, nil @@ -449,9 +501,6 @@ func (r *Runtime) cleanupAfterDeadShim(ctx context.Context, bundle *bundle, ns, pid, _ := runc.ReadPidFile(filepath.Join(bundle.path, process.InitPidFile)) ctx = namespaces.WithNamespace(ctx, ns) if err := r.terminate(ctx, bundle, ns, id); err != nil { - if r.config.ShimDebug { - return fmt.Errorf("failed to terminate task, leaving bundle for debugging: %w", err) - } log.G(ctx).WithError(err).Warn("failed to terminate task") } diff --git a/runtime/v1/linux/task.go b/runtime/v1/linux/task.go index 5a8dab1..70908ae 100644 --- a/runtime/v1/linux/task.go +++ b/runtime/v1/linux/task.go @@ -24,6 +24,7 @@ import ( "errors" "fmt" "sync" + "time" "github.com/containerd/cgroups" eventstypes "github.com/containerd/containerd/api/events" @@ -39,6 +40,7 @@ import ( "github.com/containerd/typeurl" "github.com/gogo/protobuf/types" "github.com/sirupsen/logrus" + "golang.org/x/sys/unix" ) // Task on a linux based system @@ -93,12 +95,12 @@ func (t *Task) PID(_ context.Context) (uint32, error) { } // Delete the task and return the exit status -func (t *Task) Delete(ctx context.Context) (*runtime.Exit, error) { +func (t *Task) delete(ctx context.Context, force bool, pid uint32) (*runtime.Exit, error) { rsp, shimErr := t.shim.Delete(ctx, empty) if shimErr != nil { - shimErr = errdefs.FromGRPC(shimErr) - if !errdefs.IsNotFound(shimErr) { - return nil, shimErr + log.G(ctx).WithError(shimErr).Error("failed to delete container, force=%t", force) + if !force { + return nil, errdefs.FromGRPC(shimErr) } } t.tasks.Delete(ctx, t.id) @@ -108,6 +110,14 @@ func (t *Task) Delete(ctx context.Context) (*runtime.Exit, error) { if err := t.bundle.Delete(); err != nil { log.G(ctx).WithError(err).Error("failed to delete bundle") } + + if rsp == nil { + rsp = &shim.DeleteResponse{} + rsp.ExitStatus = 128 + uint32(unix.SIGKILL) + rsp.ExitedAt = time.Now().UTC() + rsp.Pid = pid + } + if shimErr != nil { return nil, shimErr } @@ -124,6 +134,15 @@ func (t *Task) Delete(ctx context.Context) (*runtime.Exit, error) { }, nil } +// Delete the task and return the exit status +func (t *Task) Delete(ctx context.Context) (*runtime.Exit, error) { + return t.delete(ctx, false, 0) +} + +func (t *Task) DeleteForce(ctx context.Context, pid uint32) (*runtime.Exit, error) { + return t.delete(ctx, true, pid) +} + // Start the task func (t *Task) Start(ctx context.Context) error { t.mu.Lock() diff --git a/runtime/v1/shim/service.go b/runtime/v1/shim/service.go index a08757d..b00ed9c 100644 --- a/runtime/v1/shim/service.go +++ b/runtime/v1/shim/service.go @@ -23,6 +23,7 @@ import ( "context" "encoding/json" "fmt" + "io/ioutil" "os" "path/filepath" "sync" @@ -520,6 +521,9 @@ func (s *Service) checkProcesses(e runc.Exit) { return } if ip, ok := p.(*process.Init); ok { + ns := filepath.Base(filepath.Dir(ip.Bundle)) + events.ExitAddFile(ns, events.ExitFile(s.id, uint32(e.Pid), uint32(e.Status)), "init exited") + ioutil.WriteFile(filepath.Join(ip.Bundle, process.InitExit), []byte(fmt.Sprintf("%d", e.Pid)), 0600) // Ensure all children are killed if shouldKillAllOnExit(s.context, s.bundle) { if err := ip.KillAll(s.context); err != nil { diff --git a/vendor/github.com/docker/go-events/queue.go b/vendor/github.com/docker/go-events/queue.go index 4bb770a..5e83b40 100644 --- a/vendor/github.com/docker/go-events/queue.go +++ b/vendor/github.com/docker/go-events/queue.go @@ -4,6 +4,7 @@ import ( "container/list" "sync" + topevents "github.com/containerd/containerd/events" "github.com/sirupsen/logrus" ) @@ -11,11 +12,12 @@ import ( // by a sink. It is unbounded and thread safe but the sink must be reliable or // events will be dropped. type Queue struct { - dst Sink - events *list.List - cond *sync.Cond - mu sync.Mutex - closed bool + Namespace string + dst Sink + events *list.List + cond *sync.Cond + mu sync.Mutex + closed bool } // NewQueue returns a queue to the provided Sink dst. @@ -83,6 +85,12 @@ func (eq *Queue) run() { "event": event, "sink": eq.dst, }).WithError(err).Debug("eventqueue: dropped event") + } else { + if e, ok := event.(*topevents.Envelope); ok { + if ef := e.ExitFile(); ef != "" { + topevents.ExitDelFile(eq.Namespace, ef) + } + } } } } -- 2.33.0