From cd5b236a64426aa7059795afc102110a866df8f0 Mon Sep 17 00:00:00 2001 From: xiadanni1 Date: Tue, 5 Nov 2019 03:52:02 +0800 Subject: [PATCH] docker: [backport] Handle blocked I/O of exec'd processes reason: help process delete not block forever when the process exists but the I/O was inherited by a subprocess that lives on. Cherry-pick from upstream https://github.com/moby/moby/pull/39383 Change-Id: Ibf8afe3fbfb068a6308565ec1059fc9ef5d6d2e2 Signed-off-by: xiadanni1 --- components/engine/container/container.go | 2 +- components/engine/container/stream/streams.go | 29 ++++++++++++++++++++++++--- components/engine/daemon/exec/exec.go | 3 ++- components/engine/daemon/monitor.go | 6 ++++-- 4 files changed, 33 insertions(+), 7 deletions(-) diff --git a/components/engine/container/container.go b/components/engine/container/container.go index c194220..81119a0 100644 --- a/components/engine/container/container.go +++ b/components/engine/container/container.go @@ -754,7 +754,7 @@ func (i *rio) Close() error { } func (i *rio) Wait() { - i.sc.Wait() + i.sc.Wait(context.Background()) i.IO.Wait() } diff --git a/components/engine/container/stream/streams.go b/components/engine/container/stream/streams.go index d81867c..585f9e8 100644 --- a/components/engine/container/stream/streams.go +++ b/components/engine/container/stream/streams.go @@ -1,6 +1,7 @@ package stream // import "github.com/docker/docker/container/stream" import ( + "context" "fmt" "io" "io/ioutil" @@ -24,11 +25,12 @@ import ( // copied and delivered to all StdoutPipe and StderrPipe consumers, using // a kind of "broadcaster". type Config struct { - sync.WaitGroup + wg sync.WaitGroup stdout *broadcaster.Unbuffered stderr *broadcaster.Unbuffered stdin io.ReadCloser stdinPipe io.WriteCloser + dio *cio.DirectIO } // NewConfig creates a stream config and initializes @@ -115,14 +117,15 @@ func (c *Config) CloseStreams() error { // CopyToPipe connects streamconfig with a libcontainerd.IOPipe func (c *Config) CopyToPipe(iop *cio.DirectIO) { + c.dio = iop copyFunc := func(w io.Writer, r io.ReadCloser) { - c.Add(1) + c.wg.Add(1) go func() { if _, err := pools.Copy(w, r); err != nil { logrus.Errorf("stream copy error: %v", err) } r.Close() - c.Done() + c.wg.Done() }() } @@ -144,3 +147,23 @@ func (c *Config) CopyToPipe(iop *cio.DirectIO) { } } } + +// Wait for the stream to close +// Wait supports timeouts via the context to unblock and forcefully +// close the io streams +func (c *Config) Wait(ctx context.Context) { + done := make(chan struct{}, 1) + go func() { + c.wg.Wait() + close(done) + }() + select { + case <-done: + case <-ctx.Done(): + if c.dio != nil { + c.dio.Cancel() + c.dio.Wait() + c.dio.Close() + } + } +} diff --git a/components/engine/daemon/exec/exec.go b/components/engine/daemon/exec/exec.go index c036c46..08fc87c 100644 --- a/components/engine/daemon/exec/exec.go +++ b/components/engine/daemon/exec/exec.go @@ -1,6 +1,7 @@ package exec // import "github.com/docker/docker/daemon/exec" import ( + "context" "runtime" "sync" @@ -58,7 +59,7 @@ func (i *rio) Close() error { } func (i *rio) Wait() { - i.sc.Wait() + i.sc.Wait(context.Background()) i.IO.Wait() } diff --git a/components/engine/daemon/monitor.go b/components/engine/daemon/monitor.go index 7ae85f5..e041bd5 100644 --- a/components/engine/daemon/monitor.go +++ b/components/engine/daemon/monitor.go @@ -56,7 +56,8 @@ func (daemon *Daemon) ProcessEvent(id string, e libcontainerd.EventType, ei libc logrus.WithError(err).Warnf("failed to delete container %s from containerd", c.ID) } - c.StreamConfig.Wait() + ctx, _ := context.WithTimeout(context.Background(), 2*time.Second) + c.StreamConfig.Wait(ctx) c.Reset(false) exitStatus := container.ExitStatus{ @@ -121,7 +122,8 @@ func (daemon *Daemon) ProcessEvent(id string, e libcontainerd.EventType, ei libc defer execConfig.Unlock() execConfig.ExitCode = &ec execConfig.Running = false - execConfig.StreamConfig.Wait() + ctx, _ := context.WithTimeout(context.Background(), 2*time.Second) + execConfig.StreamConfig.Wait(ctx) if err := execConfig.CloseStreams(); err != nil { logrus.Errorf("failed to cleanup exec %s streams: %s", c.ID, err) } -- 1.8.3.1