containerd/patch/0020-events-resend-pending-exit-events-on-restore.patch
2019-12-30 12:24:38 +08:00

358 lines
11 KiB
Diff

From 27762e8d75c00c8898c725873c17a23105ba5b7c Mon Sep 17 00:00:00 2001
From: jingrui <jingrui@huawei.com>
Date: Tue, 12 Feb 2019 17:03:11 +0800
Subject: [PATCH 20/27] events: resend pending exit events on restore
reason: fix exit event may lost.
testCE_docker_containerd_ABN.026.sh
Change-Id: I5bcdf06ad4ee7b8a0ca782e610186f52e3d79bbd
Signed-off-by: jingrui <jingrui@huawei.com>
---
events/events.go | 13 +++++
events/exchange/exchange.go | 12 +++++
events/exit.go | 79 +++++++++++++++++++++++++++++
runtime/v1/linux/runtime.go | 56 +++++++++++++++++---
runtime/v1/linux/task.go | 10 ++--
runtime/v1/shim/service.go | 2 +
vendor/github.com/docker/go-events/queue.go | 8 +++
7 files changed, 167 insertions(+), 13 deletions(-)
create mode 100644 events/exit.go
diff --git a/events/events.go b/events/events.go
index b7eb86f..aa07236 100644
--- a/events/events.go
+++ b/events/events.go
@@ -22,6 +22,7 @@ import (
"github.com/containerd/typeurl"
"github.com/gogo/protobuf/types"
+ apievents "github.com/containerd/containerd/api/events"
)
// Envelope provides the packaging for an event.
@@ -32,6 +33,18 @@ type Envelope struct {
Event *types.Any
}
+func (e *Envelope) ExitFile() string {
+ decoded, err := typeurl.UnmarshalAny(e.Event)
+ if err != nil {
+ return ""
+ }
+
+ if e, ok := decoded.(*apievents.TaskExit); ok {
+ return ExitFile(e.ContainerID, e.Pid, e.ExitStatus)
+ }
+
+ return ""
+}
// Field returns the value for the given fieldpath as a string, if defined.
// If the value is not defined, the second value will be false.
func (e *Envelope) Field(fieldpath []string) (string, bool) {
diff --git a/events/exchange/exchange.go b/events/exchange/exchange.go
index 95d21b7..540f180 100644
--- a/events/exchange/exchange.go
+++ b/events/exchange/exchange.go
@@ -49,6 +49,11 @@ func NewExchange() *Exchange {
var _ events.Publisher = &Exchange{}
var _ events.Forwarder = &Exchange{}
var _ events.Subscriber = &Exchange{}
+var mobySubcribed = false
+
+func MobySubscribed() bool {
+ return mobySubcribed
+}
// Forward accepts an envelope to be direcly distributed on the exchange.
//
@@ -161,6 +166,13 @@ func (e *Exchange) Subscribe(ctx context.Context, fs ...string) (ch <-chan *even
}
e.broadcaster.Add(dst)
+ logrus.Infof("subscribe ctx=%v fs=%v", ctx, fs)
+ for _, s := range fs {
+ if !MobySubscribed() && s == "namespace==moby,topic~=|^/tasks/|" {
+ queue.Namespace = "moby"
+ mobySubcribed = true
+ }
+ }
go func() {
defer closeAll()
diff --git a/events/exit.go b/events/exit.go
new file mode 100644
index 0000000..e1ce089
--- /dev/null
+++ b/events/exit.go
@@ -0,0 +1,79 @@
+package events
+
+import (
+ "fmt"
+ "io/ioutil"
+ "os"
+ "path/filepath"
+ "strconv"
+ "strings"
+ "github.com/sirupsen/logrus"
+)
+
+const ExitDir = "/var/run/docker/containerd/exit"
+const ExitStatusDefault = 137
+
+func ExitFile(cid string, pid uint32, status uint32) string {
+ return fmt.Sprintf("%s.%d.%d", cid, pid, status)
+}
+
+func ExitInfo(ef string) (string, uint32, uint32) {
+ s := strings.Split(ef, ".")
+ if len(s) != 3 {
+ return "", 0, 0
+ }
+
+ cid := s[0]
+ pid, err := strconv.ParseUint(s[1], 10, 32)
+ if err != nil {
+ return "", 0, 0
+ }
+ status, err := strconv.ParseUint(s[2], 10, 32)
+ if err != nil {
+ return "", 0, 0
+ }
+
+ return cid, uint32(pid), uint32(status)
+}
+
+func ExitAddFile(ns string, ef string, reason string) {
+ os.MkdirAll(filepath.Join(ExitDir, ns), 0700)
+ err := ioutil.WriteFile(filepath.Join(ExitDir, ns, ef), []byte{}, 0600)
+ logrus.Infof("exit-add %s/%s [reason: %s] error=%v", ns, ef, reason, err)
+}
+
+func ExitDelFile(ns string, ef string) {
+ err := os.RemoveAll(filepath.Join(ExitDir, ns, ef))
+ logrus.Infof("exit-del %s/%s error=%v", ns, ef, err)
+}
+
+func ExitGetFile(ns string, cid string, pid uint32, status uint32) string {
+ ef := ExitFile(cid, pid, status)
+ if _, err := os.Stat(filepath.Join(ExitDir, ns, ef)); err == nil {
+ return ef
+ }
+ return ""
+}
+
+func ExitGetFiles(ns string) []string {
+ files, err := ioutil.ReadDir(filepath.Join(ExitDir, ns))
+ if err != nil {
+ return []string{}
+ }
+
+ names := []string{}
+ for _, f := range files {
+ names = append(names, f.Name())
+ }
+
+ return names
+}
+
+func ExitPending(ns string, cid string, pid uint32) bool {
+ for _, ef := range ExitGetFiles(ns) {
+ if strings.Contains(ef, fmt.Sprintf("%s.%d", cid, pid)) {
+ return true
+ }
+ }
+ return false
+}
diff --git a/runtime/v1/linux/runtime.go b/runtime/v1/linux/runtime.go
index 477cda0..add4d52 100644
--- a/runtime/v1/linux/runtime.go
+++ b/runtime/v1/linux/runtime.go
@@ -31,6 +31,7 @@ import (
"github.com/containerd/containerd/api/types"
"github.com/containerd/containerd/containers"
"github.com/containerd/containerd/errdefs"
+ "github.com/containerd/containerd/events"
"github.com/containerd/containerd/events/exchange"
"github.com/containerd/containerd/identifiers"
"github.com/containerd/containerd/log"
@@ -129,6 +130,7 @@ func New(ic *plugin.InitContext) (interface{}, error) {
return nil, err
}
}
+ go r.resendExitEvents(ic.Context, "moby")
return r, nil
}
@@ -175,7 +177,8 @@ func (r *Runtime) Create(ctx context.Context, id string, opts runtime.CreateOpts
}
defer func() {
if err != nil {
- bundle.Delete()
+ errd := bundle.Delete()
+ log.G(ctx).WithError(err).Errorf("revert: delete bundle error=%v", errd)
}
}()
@@ -218,9 +221,8 @@ func (r *Runtime) Create(ctx context.Context, id string, opts runtime.CreateOpts
}
defer func() {
if err != nil {
- if kerr := s.KillShim(ctx); kerr != nil {
- log.G(ctx).WithError(err).Error("failed to kill shim")
- }
+ kerr := s.KillShim(ctx)
+ log.G(ctx).WithError(err).Errorf("revert: kill shim error=%v", kerr)
}
}()
@@ -305,6 +307,41 @@ func (r *Runtime) Get(ctx context.Context, id string) (runtime.Task, error) {
return r.tasks.Get(ctx, id)
}
+func (r *Runtime) resendExitEvents(ctx context.Context, ns string) {
+ for {
+ time.Sleep(time.Second)
+ efs := events.ExitGetFiles(ns)
+ if len(efs) == 0 {
+ break
+ }
+
+ if !exchange.MobySubscribed() {
+ logrus.Infof("waiting moby event stream ...")
+ continue
+ }
+ time.Sleep(time.Second)
+
+ for _, ef := range efs {
+ cid, pid, status := events.ExitInfo(ef)
+ if cid == "" {
+ continue
+ }
+
+ e := &eventstypes.TaskExit{
+ ContainerID: cid,
+ ID: cid,
+ ExitStatus: status,
+ ExitedAt: time.Now().UTC(),
+ Pid: uint32(pid),
+ }
+
+ ctx := namespaces.WithNamespace(context.Background(), ns)
+ err := r.events.Publish(ctx, runtime.TaskExitEventTopic, e)
+ logrus.Infof("resend exit event %v error=%v", e, err)
+ }
+ }
+}
+
func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) {
dir, err := ioutil.ReadDir(filepath.Join(r.state, ns))
if err != nil {
@@ -388,13 +425,16 @@ func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) {
log.G(ctx).WithError(err).Error("loading task type")
continue
}
- if pid == -1 {
- _, err := t.DeleteForce(ctx)
- log.G(ctx).Warnf("delete force %s Pid=-1 error=%v", id, err)
+ if pid <= 0 {
+ _, err := t.DeleteForce(ctx, 0)
+ log.G(ctx).Warnf("delete force %s Pid=%d error=%v", id, pid, err)
continue
}
if _, err := os.Stat(filepath.Join(bundle.path, proc.InitExit)); err == nil {
- _, err := t.DeleteForce(ctx)
+ if !events.ExitPending(ns, t.id, uint32(pid)) {
+ events.ExitAddFile(ns, events.ExitFile(t.id, uint32(pid), uint32(events.ExitStatusDefault)), "cleanup dirty task")
+ }
+ _, err := t.DeleteForce(ctx, uint32(pid))
log.G(ctx).Warnf("delete force %s Pid=%d(exiting) error=%v", id, pid, err)
continue
}
diff --git a/runtime/v1/linux/task.go b/runtime/v1/linux/task.go
index 6995156..b692ae7 100644
--- a/runtime/v1/linux/task.go
+++ b/runtime/v1/linux/task.go
@@ -88,7 +88,7 @@ func (t *Task) Namespace() string {
}
// Delete the task and return the exit status
-func (t *Task) delete(ctx context.Context, force bool) (*runtime.Exit, error) {
+func (t *Task) delete(ctx context.Context, force bool, pid uint32) (*runtime.Exit, error) {
rsp, err := t.shim.Delete(ctx, empty)
if err != nil {
log.G(ctx).WithError(err).Error("failed to delete container, force=%t", force)
@@ -108,7 +108,7 @@ func (t *Task) delete(ctx context.Context, force bool) (*runtime.Exit, error) {
rsp = &shim.DeleteResponse{}
rsp.ExitStatus = 128 + uint32(unix.SIGKILL)
rsp.ExitedAt = time.Now().UTC()
- rsp.Pid = 0
+ rsp.Pid = pid
}
t.events.Publish(ctx, runtime.TaskDeleteEventTopic, &eventstypes.TaskDelete{
@@ -126,11 +126,11 @@ func (t *Task) delete(ctx context.Context, force bool) (*runtime.Exit, error) {
// Delete the task and return the exit status
func (t *Task) Delete(ctx context.Context) (*runtime.Exit, error) {
- return t.delete(ctx, false)
+ return t.delete(ctx, false, 0)
}
-func (t *Task) DeleteForce(ctx context.Context) (*runtime.Exit, error) {
- return t.delete(ctx, true)
+func (t *Task) DeleteForce(ctx context.Context, pid uint32) (*runtime.Exit, error) {
+ return t.delete(ctx, true, pid)
}
// Start the task
diff --git a/runtime/v1/shim/service.go b/runtime/v1/shim/service.go
index 8c7984f..a2eb35b 100644
--- a/runtime/v1/shim/service.go
+++ b/runtime/v1/shim/service.go
@@ -505,6 +505,8 @@ func (s *Service) checkProcesses(e runc.Exit) {
for _, p := range s.processes {
if p.Pid() == e.Pid {
if ip, ok := p.(*proc.Init); ok {
+ ns := filepath.Base(filepath.Dir(ip.Bundle))
+ events.ExitAddFile(ns, events.ExitFile(s.id, uint32(e.Pid), uint32(e.Status)), "init exited")
ioutil.WriteFile(filepath.Join(ip.Bundle, proc.InitExit), []byte(fmt.Sprintf("%d", e.Pid)), 0600)
}
if shouldKillAll {
diff --git a/vendor/github.com/docker/go-events/queue.go b/vendor/github.com/docker/go-events/queue.go
index 4bb770a..0608e7e 100644
--- a/vendor/github.com/docker/go-events/queue.go
+++ b/vendor/github.com/docker/go-events/queue.go
@@ -5,12 +5,14 @@ import (
"sync"
"github.com/sirupsen/logrus"
+ topevents "github.com/containerd/containerd/events"
)
// Queue accepts all messages into a queue for asynchronous consumption
// by a sink. It is unbounded and thread safe but the sink must be reliable or
// events will be dropped.
type Queue struct {
+ Namespace string
dst Sink
events *list.List
cond *sync.Cond
@@ -83,6 +85,12 @@ func (eq *Queue) run() {
"event": event,
"sink": eq.dst,
}).WithError(err).Debug("eventqueue: dropped event")
+ } else {
+ if e, ok := event.(*topevents.Envelope); ok {
+ if ef := e.ExitFile(); ef != "" {
+ topevents.ExitDelFile(eq.Namespace, ef)
+ }
+ }
}
}
}
--
2.7.4.3