From 27762e8d75c00c8898c725873c17a23105ba5b7c Mon Sep 17 00:00:00 2001 From: jingrui Date: Tue, 12 Feb 2019 17:03:11 +0800 Subject: [PATCH 20/27] events: resend pending exit events on restore reason: fix exit event may lost. testCE_docker_containerd_ABN.026.sh Change-Id: I5bcdf06ad4ee7b8a0ca782e610186f52e3d79bbd Signed-off-by: jingrui --- events/events.go | 13 +++++ events/exchange/exchange.go | 12 +++++ events/exit.go | 79 +++++++++++++++++++++++++++++ runtime/v1/linux/runtime.go | 56 +++++++++++++++++--- runtime/v1/linux/task.go | 10 ++-- runtime/v1/shim/service.go | 2 + vendor/github.com/docker/go-events/queue.go | 8 +++ 7 files changed, 167 insertions(+), 13 deletions(-) create mode 100644 events/exit.go diff --git a/events/events.go b/events/events.go index b7eb86f..aa07236 100644 --- a/events/events.go +++ b/events/events.go @@ -22,6 +22,7 @@ import ( "github.com/containerd/typeurl" "github.com/gogo/protobuf/types" + apievents "github.com/containerd/containerd/api/events" ) // Envelope provides the packaging for an event. @@ -32,6 +33,18 @@ type Envelope struct { Event *types.Any } +func (e *Envelope) ExitFile() string { + decoded, err := typeurl.UnmarshalAny(e.Event) + if err != nil { + return "" + } + + if e, ok := decoded.(*apievents.TaskExit); ok { + return ExitFile(e.ContainerID, e.Pid, e.ExitStatus) + } + + return "" +} // Field returns the value for the given fieldpath as a string, if defined. // If the value is not defined, the second value will be false. func (e *Envelope) Field(fieldpath []string) (string, bool) { diff --git a/events/exchange/exchange.go b/events/exchange/exchange.go index 95d21b7..540f180 100644 --- a/events/exchange/exchange.go +++ b/events/exchange/exchange.go @@ -49,6 +49,11 @@ func NewExchange() *Exchange { var _ events.Publisher = &Exchange{} var _ events.Forwarder = &Exchange{} var _ events.Subscriber = &Exchange{} +var mobySubcribed = false + +func MobySubscribed() bool { + return mobySubcribed +} // Forward accepts an envelope to be direcly distributed on the exchange. // @@ -161,6 +166,13 @@ func (e *Exchange) Subscribe(ctx context.Context, fs ...string) (ch <-chan *even } e.broadcaster.Add(dst) + logrus.Infof("subscribe ctx=%v fs=%v", ctx, fs) + for _, s := range fs { + if !MobySubscribed() && s == "namespace==moby,topic~=|^/tasks/|" { + queue.Namespace = "moby" + mobySubcribed = true + } + } go func() { defer closeAll() diff --git a/events/exit.go b/events/exit.go new file mode 100644 index 0000000..e1ce089 --- /dev/null +++ b/events/exit.go @@ -0,0 +1,79 @@ +package events + +import ( + "fmt" + "io/ioutil" + "os" + "path/filepath" + "strconv" + "strings" + "github.com/sirupsen/logrus" +) + +const ExitDir = "/var/run/docker/containerd/exit" +const ExitStatusDefault = 137 + +func ExitFile(cid string, pid uint32, status uint32) string { + return fmt.Sprintf("%s.%d.%d", cid, pid, status) +} + +func ExitInfo(ef string) (string, uint32, uint32) { + s := strings.Split(ef, ".") + if len(s) != 3 { + return "", 0, 0 + } + + cid := s[0] + pid, err := strconv.ParseUint(s[1], 10, 32) + if err != nil { + return "", 0, 0 + } + status, err := strconv.ParseUint(s[2], 10, 32) + if err != nil { + return "", 0, 0 + } + + return cid, uint32(pid), uint32(status) +} + +func ExitAddFile(ns string, ef string, reason string) { + os.MkdirAll(filepath.Join(ExitDir, ns), 0700) + err := ioutil.WriteFile(filepath.Join(ExitDir, ns, ef), []byte{}, 0600) + logrus.Infof("exit-add %s/%s [reason: %s] error=%v", ns, ef, reason, err) +} + +func ExitDelFile(ns string, ef string) { + err := os.RemoveAll(filepath.Join(ExitDir, ns, ef)) + logrus.Infof("exit-del %s/%s error=%v", ns, ef, err) +} + +func ExitGetFile(ns string, cid string, pid uint32, status uint32) string { + ef := ExitFile(cid, pid, status) + if _, err := os.Stat(filepath.Join(ExitDir, ns, ef)); err == nil { + return ef + } + return "" +} + +func ExitGetFiles(ns string) []string { + files, err := ioutil.ReadDir(filepath.Join(ExitDir, ns)) + if err != nil { + return []string{} + } + + names := []string{} + for _, f := range files { + names = append(names, f.Name()) + } + + return names +} + +func ExitPending(ns string, cid string, pid uint32) bool { + for _, ef := range ExitGetFiles(ns) { + if strings.Contains(ef, fmt.Sprintf("%s.%d", cid, pid)) { + return true + } + } + return false +} diff --git a/runtime/v1/linux/runtime.go b/runtime/v1/linux/runtime.go index 477cda0..add4d52 100644 --- a/runtime/v1/linux/runtime.go +++ b/runtime/v1/linux/runtime.go @@ -31,6 +31,7 @@ import ( "github.com/containerd/containerd/api/types" "github.com/containerd/containerd/containers" "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/events" "github.com/containerd/containerd/events/exchange" "github.com/containerd/containerd/identifiers" "github.com/containerd/containerd/log" @@ -129,6 +130,7 @@ func New(ic *plugin.InitContext) (interface{}, error) { return nil, err } } + go r.resendExitEvents(ic.Context, "moby") return r, nil } @@ -175,7 +177,8 @@ func (r *Runtime) Create(ctx context.Context, id string, opts runtime.CreateOpts } defer func() { if err != nil { - bundle.Delete() + errd := bundle.Delete() + log.G(ctx).WithError(err).Errorf("revert: delete bundle error=%v", errd) } }() @@ -218,9 +221,8 @@ func (r *Runtime) Create(ctx context.Context, id string, opts runtime.CreateOpts } defer func() { if err != nil { - if kerr := s.KillShim(ctx); kerr != nil { - log.G(ctx).WithError(err).Error("failed to kill shim") - } + kerr := s.KillShim(ctx) + log.G(ctx).WithError(err).Errorf("revert: kill shim error=%v", kerr) } }() @@ -305,6 +307,41 @@ func (r *Runtime) Get(ctx context.Context, id string) (runtime.Task, error) { return r.tasks.Get(ctx, id) } +func (r *Runtime) resendExitEvents(ctx context.Context, ns string) { + for { + time.Sleep(time.Second) + efs := events.ExitGetFiles(ns) + if len(efs) == 0 { + break + } + + if !exchange.MobySubscribed() { + logrus.Infof("waiting moby event stream ...") + continue + } + time.Sleep(time.Second) + + for _, ef := range efs { + cid, pid, status := events.ExitInfo(ef) + if cid == "" { + continue + } + + e := &eventstypes.TaskExit{ + ContainerID: cid, + ID: cid, + ExitStatus: status, + ExitedAt: time.Now().UTC(), + Pid: uint32(pid), + } + + ctx := namespaces.WithNamespace(context.Background(), ns) + err := r.events.Publish(ctx, runtime.TaskExitEventTopic, e) + logrus.Infof("resend exit event %v error=%v", e, err) + } + } +} + func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) { dir, err := ioutil.ReadDir(filepath.Join(r.state, ns)) if err != nil { @@ -388,13 +425,16 @@ func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) { log.G(ctx).WithError(err).Error("loading task type") continue } - if pid == -1 { - _, err := t.DeleteForce(ctx) - log.G(ctx).Warnf("delete force %s Pid=-1 error=%v", id, err) + if pid <= 0 { + _, err := t.DeleteForce(ctx, 0) + log.G(ctx).Warnf("delete force %s Pid=%d error=%v", id, pid, err) continue } if _, err := os.Stat(filepath.Join(bundle.path, proc.InitExit)); err == nil { - _, err := t.DeleteForce(ctx) + if !events.ExitPending(ns, t.id, uint32(pid)) { + events.ExitAddFile(ns, events.ExitFile(t.id, uint32(pid), uint32(events.ExitStatusDefault)), "cleanup dirty task") + } + _, err := t.DeleteForce(ctx, uint32(pid)) log.G(ctx).Warnf("delete force %s Pid=%d(exiting) error=%v", id, pid, err) continue } diff --git a/runtime/v1/linux/task.go b/runtime/v1/linux/task.go index 6995156..b692ae7 100644 --- a/runtime/v1/linux/task.go +++ b/runtime/v1/linux/task.go @@ -88,7 +88,7 @@ func (t *Task) Namespace() string { } // Delete the task and return the exit status -func (t *Task) delete(ctx context.Context, force bool) (*runtime.Exit, error) { +func (t *Task) delete(ctx context.Context, force bool, pid uint32) (*runtime.Exit, error) { rsp, err := t.shim.Delete(ctx, empty) if err != nil { log.G(ctx).WithError(err).Error("failed to delete container, force=%t", force) @@ -108,7 +108,7 @@ func (t *Task) delete(ctx context.Context, force bool) (*runtime.Exit, error) { rsp = &shim.DeleteResponse{} rsp.ExitStatus = 128 + uint32(unix.SIGKILL) rsp.ExitedAt = time.Now().UTC() - rsp.Pid = 0 + rsp.Pid = pid } t.events.Publish(ctx, runtime.TaskDeleteEventTopic, &eventstypes.TaskDelete{ @@ -126,11 +126,11 @@ func (t *Task) delete(ctx context.Context, force bool) (*runtime.Exit, error) { // Delete the task and return the exit status func (t *Task) Delete(ctx context.Context) (*runtime.Exit, error) { - return t.delete(ctx, false) + return t.delete(ctx, false, 0) } -func (t *Task) DeleteForce(ctx context.Context) (*runtime.Exit, error) { - return t.delete(ctx, true) +func (t *Task) DeleteForce(ctx context.Context, pid uint32) (*runtime.Exit, error) { + return t.delete(ctx, true, pid) } // Start the task diff --git a/runtime/v1/shim/service.go b/runtime/v1/shim/service.go index 8c7984f..a2eb35b 100644 --- a/runtime/v1/shim/service.go +++ b/runtime/v1/shim/service.go @@ -505,6 +505,8 @@ func (s *Service) checkProcesses(e runc.Exit) { for _, p := range s.processes { if p.Pid() == e.Pid { if ip, ok := p.(*proc.Init); ok { + ns := filepath.Base(filepath.Dir(ip.Bundle)) + events.ExitAddFile(ns, events.ExitFile(s.id, uint32(e.Pid), uint32(e.Status)), "init exited") ioutil.WriteFile(filepath.Join(ip.Bundle, proc.InitExit), []byte(fmt.Sprintf("%d", e.Pid)), 0600) } if shouldKillAll { diff --git a/vendor/github.com/docker/go-events/queue.go b/vendor/github.com/docker/go-events/queue.go index 4bb770a..0608e7e 100644 --- a/vendor/github.com/docker/go-events/queue.go +++ b/vendor/github.com/docker/go-events/queue.go @@ -5,12 +5,14 @@ import ( "sync" "github.com/sirupsen/logrus" + topevents "github.com/containerd/containerd/events" ) // Queue accepts all messages into a queue for asynchronous consumption // by a sink. It is unbounded and thread safe but the sink must be reliable or // events will be dropped. type Queue struct { + Namespace string dst Sink events *list.List cond *sync.Cond @@ -83,6 +85,12 @@ func (eq *Queue) run() { "event": event, "sink": eq.dst, }).WithError(err).Debug("eventqueue: dropped event") + } else { + if e, ok := event.(*topevents.Envelope); ok { + if ef := e.ExitFile(); ef != "" { + topevents.ExitDelFile(eq.Namespace, ef) + } + } } } } -- 2.7.4.3