149 lines
4.3 KiB
Diff
149 lines
4.3 KiB
Diff
From cd5b236a64426aa7059795afc102110a866df8f0 Mon Sep 17 00:00:00 2001
|
|
From: xiadanni1 <xiadanni1@huawei.com>
|
|
Date: Tue, 5 Nov 2019 03:52:02 +0800
|
|
Subject: [PATCH] docker: [backport] Handle blocked I/O of exec'd processes
|
|
|
|
reason: help process delete not block forever when the process exists but the I/O was
|
|
inherited by a subprocess that lives on.
|
|
|
|
Cherry-pick from upstream https://github.com/moby/moby/pull/39383
|
|
|
|
Change-Id: Ibf8afe3fbfb068a6308565ec1059fc9ef5d6d2e2
|
|
Signed-off-by: xiadanni1 <xiadanni1@huawei.com>
|
|
---
|
|
components/engine/container/container.go | 2 +-
|
|
components/engine/container/stream/streams.go | 29 ++++++++++++++++++++++++---
|
|
components/engine/daemon/exec/exec.go | 3 ++-
|
|
components/engine/daemon/monitor.go | 6 ++++--
|
|
4 files changed, 33 insertions(+), 7 deletions(-)
|
|
|
|
diff --git a/components/engine/container/container.go b/components/engine/container/container.go
|
|
index c194220..81119a0 100644
|
|
--- a/components/engine/container/container.go
|
|
+++ b/components/engine/container/container.go
|
|
@@ -754,7 +754,7 @@ func (i *rio) Close() error {
|
|
}
|
|
|
|
func (i *rio) Wait() {
|
|
- i.sc.Wait()
|
|
+ i.sc.Wait(context.Background())
|
|
|
|
i.IO.Wait()
|
|
}
|
|
diff --git a/components/engine/container/stream/streams.go b/components/engine/container/stream/streams.go
|
|
index d81867c..585f9e8 100644
|
|
--- a/components/engine/container/stream/streams.go
|
|
+++ b/components/engine/container/stream/streams.go
|
|
@@ -1,6 +1,7 @@
|
|
package stream // import "github.com/docker/docker/container/stream"
|
|
|
|
import (
|
|
+ "context"
|
|
"fmt"
|
|
"io"
|
|
"io/ioutil"
|
|
@@ -24,11 +25,12 @@ import (
|
|
// copied and delivered to all StdoutPipe and StderrPipe consumers, using
|
|
// a kind of "broadcaster".
|
|
type Config struct {
|
|
- sync.WaitGroup
|
|
+ wg sync.WaitGroup
|
|
stdout *broadcaster.Unbuffered
|
|
stderr *broadcaster.Unbuffered
|
|
stdin io.ReadCloser
|
|
stdinPipe io.WriteCloser
|
|
+ dio *cio.DirectIO
|
|
}
|
|
|
|
// NewConfig creates a stream config and initializes
|
|
@@ -115,14 +117,15 @@ func (c *Config) CloseStreams() error {
|
|
|
|
// CopyToPipe connects streamconfig with a libcontainerd.IOPipe
|
|
func (c *Config) CopyToPipe(iop *cio.DirectIO) {
|
|
+ c.dio = iop
|
|
copyFunc := func(w io.Writer, r io.ReadCloser) {
|
|
- c.Add(1)
|
|
+ c.wg.Add(1)
|
|
go func() {
|
|
if _, err := pools.Copy(w, r); err != nil {
|
|
logrus.Errorf("stream copy error: %v", err)
|
|
}
|
|
r.Close()
|
|
- c.Done()
|
|
+ c.wg.Done()
|
|
}()
|
|
}
|
|
|
|
@@ -144,3 +147,23 @@ func (c *Config) CopyToPipe(iop *cio.DirectIO) {
|
|
}
|
|
}
|
|
}
|
|
+
|
|
+// Wait for the stream to close
|
|
+// Wait supports timeouts via the context to unblock and forcefully
|
|
+// close the io streams
|
|
+func (c *Config) Wait(ctx context.Context) {
|
|
+ done := make(chan struct{}, 1)
|
|
+ go func() {
|
|
+ c.wg.Wait()
|
|
+ close(done)
|
|
+ }()
|
|
+ select {
|
|
+ case <-done:
|
|
+ case <-ctx.Done():
|
|
+ if c.dio != nil {
|
|
+ c.dio.Cancel()
|
|
+ c.dio.Wait()
|
|
+ c.dio.Close()
|
|
+ }
|
|
+ }
|
|
+}
|
|
diff --git a/components/engine/daemon/exec/exec.go b/components/engine/daemon/exec/exec.go
|
|
index c036c46..08fc87c 100644
|
|
--- a/components/engine/daemon/exec/exec.go
|
|
+++ b/components/engine/daemon/exec/exec.go
|
|
@@ -1,6 +1,7 @@
|
|
package exec // import "github.com/docker/docker/daemon/exec"
|
|
|
|
import (
|
|
+ "context"
|
|
"runtime"
|
|
"sync"
|
|
|
|
@@ -58,7 +59,7 @@ func (i *rio) Close() error {
|
|
}
|
|
|
|
func (i *rio) Wait() {
|
|
- i.sc.Wait()
|
|
+ i.sc.Wait(context.Background())
|
|
|
|
i.IO.Wait()
|
|
}
|
|
diff --git a/components/engine/daemon/monitor.go b/components/engine/daemon/monitor.go
|
|
index 7ae85f5..e041bd5 100644
|
|
--- a/components/engine/daemon/monitor.go
|
|
+++ b/components/engine/daemon/monitor.go
|
|
@@ -56,7 +56,8 @@ func (daemon *Daemon) ProcessEvent(id string, e libcontainerd.EventType, ei libc
|
|
logrus.WithError(err).Warnf("failed to delete container %s from containerd", c.ID)
|
|
}
|
|
|
|
- c.StreamConfig.Wait()
|
|
+ ctx, _ := context.WithTimeout(context.Background(), 2*time.Second)
|
|
+ c.StreamConfig.Wait(ctx)
|
|
c.Reset(false)
|
|
|
|
exitStatus := container.ExitStatus{
|
|
@@ -121,7 +122,8 @@ func (daemon *Daemon) ProcessEvent(id string, e libcontainerd.EventType, ei libc
|
|
defer execConfig.Unlock()
|
|
execConfig.ExitCode = &ec
|
|
execConfig.Running = false
|
|
- execConfig.StreamConfig.Wait()
|
|
+ ctx, _ := context.WithTimeout(context.Background(), 2*time.Second)
|
|
+ execConfig.StreamConfig.Wait(ctx)
|
|
if err := execConfig.CloseStreams(); err != nil {
|
|
logrus.Errorf("failed to cleanup exec %s streams: %s", c.ID, err)
|
|
}
|
|
--
|
|
1.8.3.1
|
|
|