containerd/patch/0008-runtime-Use-named-pipes-for-shim-logs.patch

579 lines
16 KiB
Diff
Raw Normal View History

2019-12-30 12:24:38 +08:00
From 9bdd5d485c6796c44356ae9482df8de467463feb Mon Sep 17 00:00:00 2001
From: lujingxiao <lujingxiao@huawei.com>
Date: Wed, 23 Jan 2019 14:57:41 +0800
Subject: [PATCH 08/27] runtime: Use named pipes for shim logs
reason: TestDaemonRestart hangs if shim_debug is enabled
Relating to issue [#2606](https://github.com/containerd/containerd/issues/2606)
Co-authored-by: Oliver Stenbom <ostenbom@pivotal.io>
Co-authored-by: Georgi Sabev <georgethebeatle@gmail.com>
Co-authored-by: Giuseppe Capizzi <gcapizzi@pivotal.io>
Co-authored-by: Danail Branekov <danailster@gmail.com>
Cherry-pick from upstream 1d4105cacf
Change-Id: I0038401dda88c234750e8d1378a4dd97230400b0
Signed-off-by: Oliver Stenbom <ostenbom@pivotal.io>
Signed-off-by: Georgi Sabev <georgethebeatle@gmail.com>
Signed-off-by: Giuseppe Capizzi <gcapizzi@pivotal.io>
Signed-off-by: Danail Branekov <danailster@gmail.com>
Signed-off-by: lujingxiao <lujingxiao@huawei.com>
---
client_test.go | 49 +++++++--
cmd/containerd-shim/main_unix.go | 28 ++++++
container_linux_test.go | 209 +++++++++++++++++++++++++++++++++++++++
runtime/v1/linux/runtime.go | 26 +++++
runtime/v1/shim.go | 38 +++++++
runtime/v1/shim/client/client.go | 34 +++++--
6 files changed, 370 insertions(+), 14 deletions(-)
create mode 100644 runtime/v1/shim.go
diff --git a/client_test.go b/client_test.go
index a6b1d59..1a4cf39 100644
--- a/client_test.go
+++ b/client_test.go
@@ -21,6 +21,8 @@ import (
"context"
"flag"
"fmt"
+ "io"
+ "io/ioutil"
"os"
"os/exec"
"testing"
@@ -36,11 +38,12 @@ import (
)
var (
- address string
- noDaemon bool
- noCriu bool
- supportsCriu bool
- testNamespace = "testing"
+ address string
+ noDaemon bool
+ noCriu bool
+ supportsCriu bool
+ testNamespace = "testing"
+ ctrdStdioFilePath string
ctrd = &daemon{}
)
@@ -76,13 +79,26 @@ func TestMain(m *testing.M) {
if !noDaemon {
sys.ForceRemoveAll(defaultRoot)
- err := ctrd.start("containerd", address, []string{
+ stdioFile, err := ioutil.TempFile("", "")
+ if err != nil {
+ fmt.Fprintf(os.Stderr, "could not create a new stdio temp file: %s\n", err)
+ os.Exit(1)
+ }
+ defer func() {
+ stdioFile.Close()
+ os.Remove(stdioFile.Name())
+ }()
+ ctrdStdioFilePath = stdioFile.Name()
+ stdioWriter := io.MultiWriter(stdioFile, buf)
+
+ err = ctrd.start("containerd", address, []string{
"--root", defaultRoot,
"--state", defaultState,
"--log-level", "debug",
- }, buf, buf)
+ "--config", createShimDebugConfig(),
+ }, stdioWriter, stdioWriter)
if err != nil {
- fmt.Fprintf(os.Stderr, "%s: %s", err, buf.String())
+ fmt.Fprintf(os.Stderr, "%s: %s\n", err, buf.String())
os.Exit(1)
}
}
@@ -137,6 +153,7 @@ func TestMain(m *testing.M) {
fmt.Fprintln(os.Stderr, "failed to wait for containerd", err)
}
}
+
if err := sys.ForceRemoveAll(defaultRoot); err != nil {
fmt.Fprintln(os.Stderr, "failed to remove test root dir", err)
os.Exit(1)
@@ -343,3 +360,19 @@ func TestClientReconnect(t *testing.T) {
t.Errorf("client closed returned error %v", err)
}
}
+
+func createShimDebugConfig() string {
+ f, err := ioutil.TempFile("", "containerd-config-")
+ if err != nil {
+ fmt.Fprintf(os.Stderr, "Failed to create config file: %s\n", err)
+ os.Exit(1)
+ }
+ defer f.Close()
+
+ if _, err := f.WriteString("[plugins.linux]\n\tshim_debug = true\n"); err != nil {
+ fmt.Fprintf(os.Stderr, "Failed to write to config file %s: %s\n", f.Name(), err)
+ os.Exit(1)
+ }
+
+ return f.Name()
+}
diff --git a/cmd/containerd-shim/main_unix.go b/cmd/containerd-shim/main_unix.go
index ca0a90a..6c59cd1 100644
--- a/cmd/containerd-shim/main_unix.go
+++ b/cmd/containerd-shim/main_unix.go
@@ -23,6 +23,7 @@ import (
"context"
"flag"
"fmt"
+ "io"
"net"
"os"
"os/exec"
@@ -36,6 +37,7 @@ import (
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/namespaces"
+ shimlog "github.com/containerd/containerd/runtime/v1"
"github.com/containerd/containerd/runtime/v1/linux/proc"
"github.com/containerd/containerd/runtime/v1/shim"
shimapi "github.com/containerd/containerd/runtime/v1/shim/v1"
@@ -92,12 +94,38 @@ func main() {
runtime.GOMAXPROCS(2)
}
+ stdout, stderr, err := openStdioKeepAlivePipes(workdirFlag)
+ if err != nil {
+ fmt.Fprintf(os.Stderr, "containerd-shim: %s\n", err)
+ os.Exit(1)
+ }
+ defer func() {
+ stdout.Close()
+ stderr.Close()
+ }()
+
if err := executeShim(); err != nil {
fmt.Fprintf(os.Stderr, "containerd-shim: %s\n", err)
os.Exit(1)
}
}
+// If containerd server process dies, we need the shim to keep stdout/err reader
+// FDs so that Linux does not SIGPIPE the shim process if it tries to use its end of
+// these pipes.
+func openStdioKeepAlivePipes(dir string) (io.ReadCloser, io.ReadCloser, error) {
+ background := context.Background()
+ keepStdoutAlive, err := shimlog.OpenShimStdoutLog(background, dir)
+ if err != nil {
+ return nil, nil, err
+ }
+ keepStderrAlive, err := shimlog.OpenShimStderrLog(background, dir)
+ if err != nil {
+ return nil, nil, err
+ }
+ return keepStdoutAlive, keepStderrAlive, nil
+}
+
func executeShim() error {
// start handling signals as soon as possible so that things are properly reaped
// or if runtime exits before we hit the handler
diff --git a/container_linux_test.go b/container_linux_test.go
index 60b0336..fa764d7 100644
--- a/container_linux_test.go
+++ b/container_linux_test.go
@@ -24,7 +24,9 @@ import (
"fmt"
"io"
"io/ioutil"
+ "os"
"os/exec"
+ "path/filepath"
"runtime"
"strings"
"sync"
@@ -258,6 +260,213 @@ func TestDaemonRestart(t *testing.T) {
<-statusC
}
+func TestShimDoesNotLeakPipes(t *testing.T) {
+ containerdPid := ctrd.cmd.Process.Pid
+ initialPipes, err := numPipes(containerdPid)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ client, err := newClient(t, address)
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer client.Close()
+
+ var (
+ image Image
+ ctx, cancel = testContext()
+ id = t.Name()
+ )
+ defer cancel()
+
+ image, err = client.GetImage(ctx, testImage)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ container, err := client.NewContainer(ctx, id, WithNewSnapshot(id, image), WithNewSpec(oci.WithImageConfig(image), withProcessArgs("sleep", "30")))
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ task, err := container.NewTask(ctx, empty())
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ exitChannel, err := task.Wait(ctx)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ if err := task.Start(ctx); err != nil {
+ t.Fatal(err)
+ }
+
+ if err := task.Kill(ctx, syscall.SIGKILL); err != nil {
+ t.Fatal(err)
+ }
+
+ <-exitChannel
+
+ if _, err := task.Delete(ctx); err != nil {
+ t.Fatal(err)
+ }
+
+ if err := container.Delete(ctx, WithSnapshotCleanup); err != nil {
+ t.Fatal(err)
+ }
+
+ currentPipes, err := numPipes(containerdPid)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ if initialPipes != currentPipes {
+ t.Errorf("Pipes have leaked after container has been deleted. Initially there were %d pipes, after container deletion there were %d pipes", initialPipes, currentPipes)
+ }
+}
+
+func numPipes(pid int) (int, error) {
+ cmd := exec.Command("sh", "-c", fmt.Sprintf("lsof -p %d | grep pipe", pid))
+
+ var stdout bytes.Buffer
+ cmd.Stdout = &stdout
+ if err := cmd.Run(); err != nil {
+ return 0, err
+ }
+ return strings.Count(stdout.String(), "\n"), nil
+}
+
+func TestDaemonReconnectsToShimIOPipesOnRestart(t *testing.T) {
+ client, err := newClient(t, address)
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer client.Close()
+
+ var (
+ image Image
+ ctx, cancel = testContext()
+ id = t.Name()
+ )
+ defer cancel()
+
+ image, err = client.GetImage(ctx, testImage)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ container, err := client.NewContainer(ctx, id, WithNewSnapshot(id, image), WithNewSpec(oci.WithImageConfig(image), withProcessArgs("sleep", "30")))
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer container.Delete(ctx, WithSnapshotCleanup)
+
+ task, err := container.NewTask(ctx, empty())
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer task.Delete(ctx)
+
+ _, err = task.Wait(ctx)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ if err := task.Start(ctx); err != nil {
+ t.Fatal(err)
+ }
+
+ if err := ctrd.Restart(nil); err != nil {
+ t.Fatal(err)
+ }
+
+ waitCtx, waitCancel := context.WithTimeout(ctx, 2*time.Second)
+ serving, err := client.IsServing(waitCtx)
+ waitCancel()
+ if !serving {
+ t.Fatalf("containerd did not start within 2s: %v", err)
+ }
+
+ // After we restared containerd we write some messages to the log pipes, simulating shim writing stuff there.
+ // Then we make sure that these messages are available on the containerd log thus proving that the server reconnected to the log pipes
+ runtimeVersion := getRuntimeVersion()
+ logDirPath := getLogDirPath(runtimeVersion, id)
+
+ switch runtimeVersion {
+ case "v1":
+ writeToFile(t, filepath.Join(logDirPath, "shim.stdout.log"), fmt.Sprintf("%s writing to stdout\n", id))
+ writeToFile(t, filepath.Join(logDirPath, "shim.stderr.log"), fmt.Sprintf("%s writing to stderr\n", id))
+ case "v2":
+ writeToFile(t, filepath.Join(logDirPath, "log"), fmt.Sprintf("%s writing to log\n", id))
+ }
+
+ statusC, err := task.Wait(ctx)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ if err := task.Kill(ctx, syscall.SIGKILL); err != nil {
+ t.Fatal(err)
+ }
+
+ <-statusC
+
+ stdioContents, err := ioutil.ReadFile(ctrdStdioFilePath)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ switch runtimeVersion {
+ case "v1":
+ if !strings.Contains(string(stdioContents), fmt.Sprintf("%s writing to stdout", id)) {
+ t.Fatal("containerd did not connect to the shim stdout pipe")
+ }
+ if !strings.Contains(string(stdioContents), fmt.Sprintf("%s writing to stderr", id)) {
+ t.Fatal("containerd did not connect to the shim stderr pipe")
+ }
+ case "v2":
+ if !strings.Contains(string(stdioContents), fmt.Sprintf("%s writing to log", id)) {
+ t.Fatal("containerd did not connect to the shim log pipe")
+ }
+ }
+}
+
+func writeToFile(t *testing.T, filePath, message string) {
+ writer, err := os.OpenFile(filePath, os.O_WRONLY, 0600)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if _, err := writer.WriteString(message); err != nil {
+ t.Fatal(err)
+ }
+ if err := writer.Close(); err != nil {
+ t.Fatal(err)
+ }
+}
+
+func getLogDirPath(runtimeVersion, id string) string {
+ switch runtimeVersion {
+ case "v1":
+ return filepath.Join(defaultRoot, "io.containerd.runtime.v1.linux", testNamespace, id)
+ case "v2":
+ return filepath.Join(defaultState, "io.containerd.runtime.v2.task", testNamespace, id)
+ default:
+ panic(fmt.Errorf("Unsupported runtime version %s", runtimeVersion))
+ }
+}
+
+func getRuntimeVersion() string {
+ switch rt := os.Getenv("TEST_RUNTIME"); rt {
+ case "io.containerd.runc.v1":
+ return "v2"
+ default:
+ return "v1"
+ }
+}
+
func TestContainerPTY(t *testing.T) {
t.Parallel()
diff --git a/runtime/v1/linux/runtime.go b/runtime/v1/linux/runtime.go
index d19b8e5..e1b3cac 100644
--- a/runtime/v1/linux/runtime.go
+++ b/runtime/v1/linux/runtime.go
@@ -21,6 +21,7 @@ package linux
import (
"context"
"fmt"
+ "io"
"io/ioutil"
"os"
"path/filepath"
@@ -40,6 +41,7 @@ import (
"github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/runtime"
"github.com/containerd/containerd/runtime/linux/runctypes"
+ "github.com/containerd/containerd/runtime/v1"
"github.com/containerd/containerd/runtime/v1/linux/proc"
shim "github.com/containerd/containerd/runtime/v1/shim/v1"
runc "github.com/containerd/go-runc"
@@ -341,6 +343,30 @@ func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) {
continue
}
+ logDirPath := filepath.Join(r.root, ns, id)
+
+ shimStdoutLog, err := v1.OpenShimStdoutLog(ctx, logDirPath)
+ if err != nil {
+ log.G(ctx).WithError(err).WithFields(logrus.Fields{
+ "id": id,
+ "namespace": ns,
+ "logDirPath": logDirPath,
+ }).Error("opening shim stdout log pipe")
+ continue
+ }
+ go io.Copy(os.Stdout, shimStdoutLog)
+
+ shimStderrLog, err := v1.OpenShimStderrLog(ctx, logDirPath)
+ if err != nil {
+ log.G(ctx).WithError(err).WithFields(logrus.Fields{
+ "id": id,
+ "namespace": ns,
+ "logDirPath": logDirPath,
+ }).Error("opening shim stderr log pipe")
+ continue
+ }
+ go io.Copy(os.Stderr, shimStderrLog)
+
t, err := newTask(id, ns, pid, s, r.events, r.tasks, bundle)
if err != nil {
log.G(ctx).WithError(err).Error("loading task type")
diff --git a/runtime/v1/shim.go b/runtime/v1/shim.go
new file mode 100644
index 0000000..3942968
--- /dev/null
+++ b/runtime/v1/shim.go
@@ -0,0 +1,38 @@
+// +build !windows
+
+/*
+ Copyright The containerd Authors.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package v1
+
+import (
+ "context"
+ "io"
+ "path/filepath"
+
+ "github.com/containerd/fifo"
+ "golang.org/x/sys/unix"
+)
+
+// OpenShimStdoutLog opens the shim log for reading
+func OpenShimStdoutLog(ctx context.Context, logDirPath string) (io.ReadWriteCloser, error) {
+ return fifo.OpenFifo(ctx, filepath.Join(logDirPath, "shim.stdout.log"), unix.O_RDWR|unix.O_CREAT|unix.O_NONBLOCK, 0700)
+}
+
+// OpenShimStderrLog opens the shim log
+func OpenShimStderrLog(ctx context.Context, logDirPath string) (io.ReadWriteCloser, error) {
+ return fifo.OpenFifo(ctx, filepath.Join(logDirPath, "shim.stderr.log"), unix.O_RDWR|unix.O_CREAT|unix.O_NONBLOCK, 0700)
+}
diff --git a/runtime/v1/shim/client/client.go b/runtime/v1/shim/client/client.go
index 015d88c..ef74030 100644
--- a/runtime/v1/shim/client/client.go
+++ b/runtime/v1/shim/client/client.go
@@ -37,6 +37,7 @@ import (
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/log"
+ v1 "github.com/containerd/containerd/runtime/v1"
"github.com/containerd/containerd/runtime/v1/shim"
shimapi "github.com/containerd/containerd/runtime/v1/shim/v1"
"github.com/containerd/containerd/sys"
@@ -62,7 +63,24 @@ func WithStart(binary, address, daemonAddress, cgroup string, debug bool, exitHa
}
defer f.Close()
- cmd, err := newCommand(binary, daemonAddress, debug, config, f)
+ var stdoutLog io.ReadWriteCloser
+ var stderrLog io.ReadWriteCloser
+ if debug {
+ stdoutLog, err = v1.OpenShimStdoutLog(ctx, config.WorkDir)
+ if err != nil {
+ return nil, nil, errors.Wrapf(err, "failed to create stdout log")
+ }
+
+ stderrLog, err = v1.OpenShimStderrLog(ctx, config.WorkDir)
+ if err != nil {
+ return nil, nil, errors.Wrapf(err, "failed to create stderr log")
+ }
+
+ go io.Copy(os.Stdout, stdoutLog)
+ go io.Copy(os.Stderr, stderrLog)
+ }
+
+ cmd, err := newCommand(binary, daemonAddress, debug, config, f, stdoutLog, stderrLog)
if err != nil {
return nil, nil, err
}
@@ -77,6 +95,12 @@ func WithStart(binary, address, daemonAddress, cgroup string, debug bool, exitHa
go func() {
cmd.Wait()
exitHandler()
+ if stdoutLog != nil {
+ stderrLog.Close()
+ }
+ if stdoutLog != nil {
+ stderrLog.Close()
+ }
}()
log.G(ctx).WithFields(logrus.Fields{
"pid": cmd.Process.Pid,
@@ -104,7 +128,7 @@ func WithStart(binary, address, daemonAddress, cgroup string, debug bool, exitHa
}
}
-func newCommand(binary, daemonAddress string, debug bool, config shim.Config, socket *os.File) (*exec.Cmd, error) {
+func newCommand(binary, daemonAddress string, debug bool, config shim.Config, socket *os.File, stdout, stderr io.Writer) (*exec.Cmd, error) {
selfExe, err := os.Executable()
if err != nil {
return nil, err
@@ -137,10 +161,8 @@ func newCommand(binary, daemonAddress string, debug bool, config shim.Config, so
cmd.SysProcAttr = getSysProcAttr()
cmd.ExtraFiles = append(cmd.ExtraFiles, socket)
cmd.Env = append(os.Environ(), "GOMAXPROCS=2")
- if debug {
- cmd.Stdout = os.Stdout
- cmd.Stderr = os.Stderr
- }
+ cmd.Stdout = stdout
+ cmd.Stderr = stderr
return cmd, nil
}
--
2.7.4.3