docker/patch/0130-docker-Handle-blocked-I-O-of-exec-d-process.patch
2019-12-25 19:10:46 +08:00

149 lines
4.3 KiB
Diff

From cd5b236a64426aa7059795afc102110a866df8f0 Mon Sep 17 00:00:00 2001
From: xiadanni1 <xiadanni1@huawei.com>
Date: Tue, 5 Nov 2019 03:52:02 +0800
Subject: [PATCH] docker: [backport] Handle blocked I/O of exec'd processes
reason: help process delete not block forever when the process exists but the I/O was
inherited by a subprocess that lives on.
Cherry-pick from upstream https://github.com/moby/moby/pull/39383
Change-Id: Ibf8afe3fbfb068a6308565ec1059fc9ef5d6d2e2
Signed-off-by: xiadanni1 <xiadanni1@huawei.com>
---
components/engine/container/container.go | 2 +-
components/engine/container/stream/streams.go | 29 ++++++++++++++++++++++++---
components/engine/daemon/exec/exec.go | 3 ++-
components/engine/daemon/monitor.go | 6 ++++--
4 files changed, 33 insertions(+), 7 deletions(-)
diff --git a/components/engine/container/container.go b/components/engine/container/container.go
index c194220..81119a0 100644
--- a/components/engine/container/container.go
+++ b/components/engine/container/container.go
@@ -754,7 +754,7 @@ func (i *rio) Close() error {
}
func (i *rio) Wait() {
- i.sc.Wait()
+ i.sc.Wait(context.Background())
i.IO.Wait()
}
diff --git a/components/engine/container/stream/streams.go b/components/engine/container/stream/streams.go
index d81867c..585f9e8 100644
--- a/components/engine/container/stream/streams.go
+++ b/components/engine/container/stream/streams.go
@@ -1,6 +1,7 @@
package stream // import "github.com/docker/docker/container/stream"
import (
+ "context"
"fmt"
"io"
"io/ioutil"
@@ -24,11 +25,12 @@ import (
// copied and delivered to all StdoutPipe and StderrPipe consumers, using
// a kind of "broadcaster".
type Config struct {
- sync.WaitGroup
+ wg sync.WaitGroup
stdout *broadcaster.Unbuffered
stderr *broadcaster.Unbuffered
stdin io.ReadCloser
stdinPipe io.WriteCloser
+ dio *cio.DirectIO
}
// NewConfig creates a stream config and initializes
@@ -115,14 +117,15 @@ func (c *Config) CloseStreams() error {
// CopyToPipe connects streamconfig with a libcontainerd.IOPipe
func (c *Config) CopyToPipe(iop *cio.DirectIO) {
+ c.dio = iop
copyFunc := func(w io.Writer, r io.ReadCloser) {
- c.Add(1)
+ c.wg.Add(1)
go func() {
if _, err := pools.Copy(w, r); err != nil {
logrus.Errorf("stream copy error: %v", err)
}
r.Close()
- c.Done()
+ c.wg.Done()
}()
}
@@ -144,3 +147,23 @@ func (c *Config) CopyToPipe(iop *cio.DirectIO) {
}
}
}
+
+// Wait for the stream to close
+// Wait supports timeouts via the context to unblock and forcefully
+// close the io streams
+func (c *Config) Wait(ctx context.Context) {
+ done := make(chan struct{}, 1)
+ go func() {
+ c.wg.Wait()
+ close(done)
+ }()
+ select {
+ case <-done:
+ case <-ctx.Done():
+ if c.dio != nil {
+ c.dio.Cancel()
+ c.dio.Wait()
+ c.dio.Close()
+ }
+ }
+}
diff --git a/components/engine/daemon/exec/exec.go b/components/engine/daemon/exec/exec.go
index c036c46..08fc87c 100644
--- a/components/engine/daemon/exec/exec.go
+++ b/components/engine/daemon/exec/exec.go
@@ -1,6 +1,7 @@
package exec // import "github.com/docker/docker/daemon/exec"
import (
+ "context"
"runtime"
"sync"
@@ -58,7 +59,7 @@ func (i *rio) Close() error {
}
func (i *rio) Wait() {
- i.sc.Wait()
+ i.sc.Wait(context.Background())
i.IO.Wait()
}
diff --git a/components/engine/daemon/monitor.go b/components/engine/daemon/monitor.go
index 7ae85f5..e041bd5 100644
--- a/components/engine/daemon/monitor.go
+++ b/components/engine/daemon/monitor.go
@@ -56,7 +56,8 @@ func (daemon *Daemon) ProcessEvent(id string, e libcontainerd.EventType, ei libc
logrus.WithError(err).Warnf("failed to delete container %s from containerd", c.ID)
}
- c.StreamConfig.Wait()
+ ctx, _ := context.WithTimeout(context.Background(), 2*time.Second)
+ c.StreamConfig.Wait(ctx)
c.Reset(false)
exitStatus := container.ExitStatus{
@@ -121,7 +122,8 @@ func (daemon *Daemon) ProcessEvent(id string, e libcontainerd.EventType, ei libc
defer execConfig.Unlock()
execConfig.ExitCode = &ec
execConfig.Running = false
- execConfig.StreamConfig.Wait()
+ ctx, _ := context.WithTimeout(context.Background(), 2*time.Second)
+ execConfig.StreamConfig.Wait(ctx)
if err := execConfig.CloseStreams(); err != nil {
logrus.Errorf("failed to cleanup exec %s streams: %s", c.ID, err)
}
--
1.8.3.1