containerd/patch/0002-containerd-cleanup-container-when-containerd-dockerd.patch
zhongjiawei 4a1d8da417 containerd:add patch for 1.6.22
Signed-off-by: zhongjiawei <zhongjiawei1@huawei.com>
2023-09-08 15:52:11 +08:00

476 lines
14 KiB
Diff

From 49e88aa61dd8a99e17edf020faae2307b63858da Mon Sep 17 00:00:00 2001
From: jingrui <jingrui@huawei.com>
Date: Sun, 10 Feb 2019 15:40:52 +0800
Subject: [PATCH] containerd:cleanup container when containerd/dockerd is
killed
when containerd killed during task create, see Runtime.Create(). the
defer function will not execute, so shim residual. cleanup shim for
container pid=-1
And kill dockerd during docker stop in post-stophook, containerd will load
task and treat as ok when shim response client. add init.exit to forbid
load exiting task. also exit event may lost, fix it
Signed-off-by: jingrui <jingrui@huawei.com>
---
events/events.go | 14 +++
events/exchange/exchange.go | 12 +++
events/exit.go | 108 ++++++++++++++++++++
pkg/process/utils.go | 2 +
runtime/v1/linux/runtime.go | 63 ++++++++++--
runtime/v1/linux/task.go | 27 ++++-
runtime/v1/shim/service.go | 4 +
vendor/github.com/docker/go-events/queue.go | 18 +++-
8 files changed, 232 insertions(+), 16 deletions(-)
create mode 100644 events/exit.go
diff --git a/events/events.go b/events/events.go
index b7eb86f..70ef315 100644
--- a/events/events.go
+++ b/events/events.go
@@ -20,6 +20,7 @@ import (
"context"
"time"
+ apievents "github.com/containerd/containerd/api/events"
"github.com/containerd/typeurl"
"github.com/gogo/protobuf/types"
)
@@ -32,6 +33,19 @@ type Envelope struct {
Event *types.Any
}
+func (e *Envelope) ExitFile() string {
+ decoded, err := typeurl.UnmarshalAny(e.Event)
+ if err != nil {
+ return ""
+ }
+
+ if e, ok := decoded.(*apievents.TaskExit); ok {
+ return ExitFile(e.ContainerID, e.Pid, e.ExitStatus)
+ }
+
+ return ""
+}
+
// Field returns the value for the given fieldpath as a string, if defined.
// If the value is not defined, the second value will be false.
func (e *Envelope) Field(fieldpath []string) (string, bool) {
diff --git a/events/exchange/exchange.go b/events/exchange/exchange.go
index a1f385d..162e7be 100644
--- a/events/exchange/exchange.go
+++ b/events/exchange/exchange.go
@@ -49,6 +49,11 @@ func NewExchange() *Exchange {
var _ events.Publisher = &Exchange{}
var _ events.Forwarder = &Exchange{}
var _ events.Subscriber = &Exchange{}
+var mobySubcribed = false
+
+func MobySubscribed() bool {
+ return mobySubcribed
+}
// Forward accepts an envelope to be directly distributed on the exchange.
//
@@ -161,6 +166,13 @@ func (e *Exchange) Subscribe(ctx context.Context, fs ...string) (ch <-chan *even
}
e.broadcaster.Add(dst)
+ logrus.Infof("subscribe ctx=%v fs=%v", ctx, fs)
+ for _, s := range fs {
+ if !MobySubscribed() && s == "namespace==moby,topic~=|^/tasks/|" {
+ queue.Namespace = "moby"
+ mobySubcribed = true
+ }
+ }
go func() {
defer closeAll()
diff --git a/events/exit.go b/events/exit.go
new file mode 100644
index 0000000..ee9d5a9
--- /dev/null
+++ b/events/exit.go
@@ -0,0 +1,108 @@
+/*
+Use of this source code is governed by Apache-2.0
+license that can be found in the LICENSE file
+Description: common functions
+Author: jingrui
+Create: 2019-02-12
+*/
+
+package events
+
+import (
+ "fmt"
+ "io/ioutil"
+ "os"
+ "path/filepath"
+ "strconv"
+ "strings"
+
+ "github.com/sirupsen/logrus"
+)
+
+const ExitDir = "/var/run/docker/containerd/exit"
+const ExitStatusDefault = 137
+const InitExit = "init.exit"
+
+func ExitFile(cid string, pid uint32, status uint32) string {
+ return fmt.Sprintf("%s.%d.%d", cid, pid, status)
+}
+
+func ExitInfo(ef string) (string, uint32, uint32) {
+ s := strings.Split(ef, ".")
+ if len(s) != 3 {
+ return "", 0, 0
+ }
+
+ cid := s[0]
+ pid, err := strconv.ParseUint(s[1], 10, 32)
+ if err != nil {
+ return "", 0, 0
+ }
+ status, err := strconv.ParseUint(s[2], 10, 32)
+ if err != nil {
+ return "", 0, 0
+ }
+
+ return cid, uint32(pid), uint32(status)
+}
+
+func ExitAddFile(ns string, ef string, reason string) {
+ logrus.Devour(os.MkdirAll(filepath.Join(ExitDir, ns), 0700))
+ err := ioutil.WriteFile(filepath.Join(ExitDir, ns, ef), []byte{}, 0600)
+ logrus.Infof("exit-add %s/%s [reason: %s] error=%v", ns, ef, reason, err)
+}
+
+func ExitDelFile(ns string, ef string) {
+ err := os.RemoveAll(filepath.Join(ExitDir, ns, ef))
+ logrus.Devour(err)
+ logrus.Infof("exit-del %s/%s error=%v", ns, ef, err)
+}
+
+func ExitGetFile(ns string, cid string, pid uint32, status uint32) string {
+ ef := ExitFile(cid, pid, status)
+ if _, err := os.Stat(filepath.Join(ExitDir, ns, ef)); err == nil {
+ return ef
+ }
+ return ""
+}
+
+func ExitGetFiles(ns string) []string {
+ files, err := ioutil.ReadDir(filepath.Join(ExitDir, ns))
+ if err != nil {
+ return []string{}
+ }
+
+ names := []string{}
+ for _, f := range files {
+ names = append(names, f.Name())
+ }
+
+ return names
+}
+
+func ExitPending(ns string, cid string, pid uint32) bool {
+ for _, ef := range ExitGetFiles(ns) {
+ if strings.Contains(ef, fmt.Sprintf("%s.%d", cid, pid)) {
+ return true
+ }
+ }
+ return false
+}
+
+func InitExitWrite(bundle string, pid int) {
+ if _, err := os.Stat(bundle); err != nil {
+ logrus.Infof("skip write init.exit %s error=%v", bundle, err)
+ return
+ }
+ err := ioutil.WriteFile(filepath.Join(bundle, InitExit), []byte(fmt.Sprintf("%d", pid)), 0600)
+ if err != nil {
+ logrus.Infof("failed write init.exit error=%s", bundle, err)
+ }
+}
+
+func InitExitExist(bundle string) bool {
+ if _, err := os.Stat(filepath.Join(bundle, InitExit)); err == nil {
+ return true
+ }
+ return false
+}
diff --git a/pkg/process/utils.go b/pkg/process/utils.go
index afada02..5ff04ed 100644
--- a/pkg/process/utils.go
+++ b/pkg/process/utils.go
@@ -41,6 +41,8 @@ const (
RuncRoot = "/run/containerd/runc"
// InitPidFile name of the file that contains the init pid
InitPidFile = "init.pid"
+
+ InitExit = "init.exit"
)
// safePid is a thread safe wrapper for pid.
diff --git a/runtime/v1/linux/runtime.go b/runtime/v1/linux/runtime.go
index b6d5382..a6efd81 100644
--- a/runtime/v1/linux/runtime.go
+++ b/runtime/v1/linux/runtime.go
@@ -32,6 +32,7 @@ import (
"github.com/containerd/containerd/api/types"
"github.com/containerd/containerd/containers"
"github.com/containerd/containerd/errdefs"
+ "github.com/containerd/containerd/events"
"github.com/containerd/containerd/events/exchange"
"github.com/containerd/containerd/identifiers"
"github.com/containerd/containerd/log"
@@ -138,6 +139,7 @@ func New(ic *plugin.InitContext) (interface{}, error) {
return nil, err
}
}
+ go r.resendExitEvents(ic.Context, "moby")
return r, nil
}
@@ -184,7 +186,8 @@ func (r *Runtime) Create(ctx context.Context, id string, opts runtime.CreateOpts
}
defer func() {
if err != nil {
- bundle.Delete()
+ errd := bundle.Delete()
+ log.G(ctx).WithError(err).Errorf("revert: delete bundle error=%v", errd)
}
}()
@@ -225,9 +228,8 @@ func (r *Runtime) Create(ctx context.Context, id string, opts runtime.CreateOpts
deferCtx, deferCancel := context.WithTimeout(
namespaces.WithNamespace(context.TODO(), namespace), cleanupTimeout)
defer deferCancel()
- if kerr := s.KillShim(deferCtx); kerr != nil {
- log.G(ctx).WithError(kerr).Error("failed to kill shim")
- }
+ kerr := s.KillShim(deferCtx)
+ log.G(ctx).WithError(err).Errorf("revert: kill shim error=%v", kerr)
}
}()
@@ -338,6 +340,41 @@ func (r *Runtime) Delete(ctx context.Context, id string) (*runtime.Exit, error)
return exit, nil
}
+func (r *Runtime) resendExitEvents(ctx context.Context, ns string) {
+ for {
+ time.Sleep(time.Second)
+ efs := events.ExitGetFiles(ns)
+ if len(efs) == 0 {
+ break
+ }
+
+ if !exchange.MobySubscribed() {
+ logrus.Infof("waiting moby event stream ...")
+ continue
+ }
+ time.Sleep(time.Second)
+
+ for _, ef := range efs {
+ cid, pid, status := events.ExitInfo(ef)
+ if cid == "" {
+ continue
+ }
+
+ e := &eventstypes.TaskExit{
+ ContainerID: cid,
+ ID: cid,
+ ExitStatus: status,
+ ExitedAt: time.Now().UTC(),
+ Pid: uint32(pid),
+ }
+
+ ctx := namespaces.WithNamespace(context.Background(), ns)
+ err := r.events.Publish(ctx, runtime.TaskExitEventTopic, e)
+ logrus.Infof("resend exit event %v error=%v", e, err)
+ }
+ }
+}
+
func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) {
dir, err := os.ReadDir(filepath.Join(r.state, ns))
if err != nil {
@@ -349,6 +386,7 @@ func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) {
continue
}
id := path.Name()
+ log.G(ctx).Infof("load-task %s", id)
// skip hidden directories
if len(id) > 0 && id[0] == '.' {
continue
@@ -435,6 +473,20 @@ func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) {
log.G(ctx).WithError(err).Error("loading task type")
continue
}
+ if pid <= 0 {
+ _, err := t.DeleteForce(ctx, 0)
+ log.G(ctx).Warnf("delete force %s Pid=%d error=%v", id, pid, err)
+ continue
+ }
+ if _, err := os.Stat(filepath.Join(bundle.path, process.InitExit)); err == nil {
+ if !events.ExitPending(ns, t.id, uint32(pid)) {
+ events.ExitAddFile(ns, events.ExitFile(t.id, uint32(pid), uint32(events.ExitStatusDefault)), "cleanup dirty task")
+ }
+ _, err := t.DeleteForce(ctx, uint32(pid))
+ log.G(ctx).Warnf("delete force %s Pid=%d(exiting) error=%v", id, pid, err)
+ continue
+ }
+ log.G(ctx).Infof("load-task %s Pid=%d done", id, pid)
o = append(o, t)
}
return o, nil
@@ -449,9 +501,6 @@ func (r *Runtime) cleanupAfterDeadShim(ctx context.Context, bundle *bundle, ns,
pid, _ := runc.ReadPidFile(filepath.Join(bundle.path, process.InitPidFile))
ctx = namespaces.WithNamespace(ctx, ns)
if err := r.terminate(ctx, bundle, ns, id); err != nil {
- if r.config.ShimDebug {
- return fmt.Errorf("failed to terminate task, leaving bundle for debugging: %w", err)
- }
log.G(ctx).WithError(err).Warn("failed to terminate task")
}
diff --git a/runtime/v1/linux/task.go b/runtime/v1/linux/task.go
index 5a8dab1..70908ae 100644
--- a/runtime/v1/linux/task.go
+++ b/runtime/v1/linux/task.go
@@ -24,6 +24,7 @@ import (
"errors"
"fmt"
"sync"
+ "time"
"github.com/containerd/cgroups"
eventstypes "github.com/containerd/containerd/api/events"
@@ -39,6 +40,7 @@ import (
"github.com/containerd/typeurl"
"github.com/gogo/protobuf/types"
"github.com/sirupsen/logrus"
+ "golang.org/x/sys/unix"
)
// Task on a linux based system
@@ -93,12 +95,12 @@ func (t *Task) PID(_ context.Context) (uint32, error) {
}
// Delete the task and return the exit status
-func (t *Task) Delete(ctx context.Context) (*runtime.Exit, error) {
+func (t *Task) delete(ctx context.Context, force bool, pid uint32) (*runtime.Exit, error) {
rsp, shimErr := t.shim.Delete(ctx, empty)
if shimErr != nil {
- shimErr = errdefs.FromGRPC(shimErr)
- if !errdefs.IsNotFound(shimErr) {
- return nil, shimErr
+ log.G(ctx).WithError(shimErr).Error("failed to delete container, force=%t", force)
+ if !force {
+ return nil, errdefs.FromGRPC(shimErr)
}
}
t.tasks.Delete(ctx, t.id)
@@ -108,6 +110,14 @@ func (t *Task) Delete(ctx context.Context) (*runtime.Exit, error) {
if err := t.bundle.Delete(); err != nil {
log.G(ctx).WithError(err).Error("failed to delete bundle")
}
+
+ if rsp == nil {
+ rsp = &shim.DeleteResponse{}
+ rsp.ExitStatus = 128 + uint32(unix.SIGKILL)
+ rsp.ExitedAt = time.Now().UTC()
+ rsp.Pid = pid
+ }
+
if shimErr != nil {
return nil, shimErr
}
@@ -124,6 +134,15 @@ func (t *Task) Delete(ctx context.Context) (*runtime.Exit, error) {
}, nil
}
+// Delete the task and return the exit status
+func (t *Task) Delete(ctx context.Context) (*runtime.Exit, error) {
+ return t.delete(ctx, false, 0)
+}
+
+func (t *Task) DeleteForce(ctx context.Context, pid uint32) (*runtime.Exit, error) {
+ return t.delete(ctx, true, pid)
+}
+
// Start the task
func (t *Task) Start(ctx context.Context) error {
t.mu.Lock()
diff --git a/runtime/v1/shim/service.go b/runtime/v1/shim/service.go
index a08757d..b00ed9c 100644
--- a/runtime/v1/shim/service.go
+++ b/runtime/v1/shim/service.go
@@ -23,6 +23,7 @@ import (
"context"
"encoding/json"
"fmt"
+ "io/ioutil"
"os"
"path/filepath"
"sync"
@@ -520,6 +521,9 @@ func (s *Service) checkProcesses(e runc.Exit) {
return
}
if ip, ok := p.(*process.Init); ok {
+ ns := filepath.Base(filepath.Dir(ip.Bundle))
+ events.ExitAddFile(ns, events.ExitFile(s.id, uint32(e.Pid), uint32(e.Status)), "init exited")
+ ioutil.WriteFile(filepath.Join(ip.Bundle, process.InitExit), []byte(fmt.Sprintf("%d", e.Pid)), 0600)
// Ensure all children are killed
if shouldKillAllOnExit(s.context, s.bundle) {
if err := ip.KillAll(s.context); err != nil {
diff --git a/vendor/github.com/docker/go-events/queue.go b/vendor/github.com/docker/go-events/queue.go
index 4bb770a..5e83b40 100644
--- a/vendor/github.com/docker/go-events/queue.go
+++ b/vendor/github.com/docker/go-events/queue.go
@@ -4,6 +4,7 @@ import (
"container/list"
"sync"
+ topevents "github.com/containerd/containerd/events"
"github.com/sirupsen/logrus"
)
@@ -11,11 +12,12 @@ import (
// by a sink. It is unbounded and thread safe but the sink must be reliable or
// events will be dropped.
type Queue struct {
- dst Sink
- events *list.List
- cond *sync.Cond
- mu sync.Mutex
- closed bool
+ Namespace string
+ dst Sink
+ events *list.List
+ cond *sync.Cond
+ mu sync.Mutex
+ closed bool
}
// NewQueue returns a queue to the provided Sink dst.
@@ -83,6 +85,12 @@ func (eq *Queue) run() {
"event": event,
"sink": eq.dst,
}).WithError(err).Debug("eventqueue: dropped event")
+ } else {
+ if e, ok := event.(*topevents.Envelope); ok {
+ if ef := e.ExitFile(); ef != "" {
+ topevents.ExitDelFile(eq.Namespace, ef)
+ }
+ }
}
}
}
--
2.33.0