From 470a207ed468b5473d7833987791f01b467141a3 Mon Sep 17 00:00:00 2001 From: jingrui Date: Fri, 5 Feb 2021 15:10:27 +0800 Subject: [PATCH] containerd: backport upstream patches 2019-11-27 15:29:44 -0800 b97098762 Fix container pid. Lantao Liu lantaol@google.com 2019-06-21 15:28:16 -0400 f71f6d39b Robust pid locking for shim processes Michael Crosby crosbymichael.. 2019-06-20 16:13:51 -0400 42aba6e0f Add timeout for I/O waitgroups Michael Crosby crosbymichael.. 2019-02-01 02:08:49 -0800 5730c5003 Add a separate lock for pid. Lantao Liu lantaol@google.com 2019-01-31 18:59:29 -0800 b9b7ef32b Revert "use state machine management for exec.Pid()" Lantao Liu lantaol@google.com 2019-01-22 16:19:09 -0800 ab2cf0136 Use context.Background for `O_NONBLOCK` `OpenFifo`. Lantao Liu lantaol@google.com 2018-11-23 17:46:32 +0800 c42c8952b use state machine management for exec.Pid() Lifubang lifubang@acmcoder.com 2018-11-09 11:12:55 -0500 4c72befe0 Fix process locking and state management Michael Crosby crosbymichael.. Conflict:NA Reference:https://github.com/containerd/containerd/pull/3755 Change-Id: Ic7f768e72a38383c1b89680333c9ee234ea217aa Signed-off-by: jingrui --- runtime/proc/proc.go | 5 - runtime/v1/linux/proc/exec.go | 77 +++++++++--- runtime/v1/linux/proc/exec_state.go | 59 +++------ runtime/v1/linux/proc/init.go | 155 +++++++++++++++++------ runtime/v1/linux/proc/init_state.go | 182 +++++----------------------- runtime/v1/linux/proc/io.go | 2 +- runtime/v1/linux/proc/utils.go | 12 ++ runtime/v2/runc/service_linux.go | 2 +- runtime/v2/shim_unix.go | 2 +- 9 files changed, 235 insertions(+), 261 deletions(-) diff --git a/runtime/proc/proc.go b/runtime/proc/proc.go index 02bc9bda8..91ca59bb1 100644 --- a/runtime/proc/proc.go +++ b/runtime/proc/proc.go @@ -40,7 +40,6 @@ func (s Stdio) IsNull() bool { // Process on a system type Process interface { - State // ID returns the id for the process ID() string // Pid returns the pid for the process @@ -57,10 +56,6 @@ type Process interface { Status(context.Context) (string, error) // Wait blocks until the process has exited Wait() -} - -// State of a process -type State interface { // Resize resizes the process console Resize(ws console.WinSize) error // Start execution of the process diff --git a/runtime/v1/linux/proc/exec.go b/runtime/v1/linux/proc/exec.go index 08c581fbf..ea40cb5b8 100644 --- a/runtime/v1/linux/proc/exec.go +++ b/runtime/v1/linux/proc/exec.go @@ -31,6 +31,7 @@ import ( "golang.org/x/sys/unix" "github.com/containerd/console" + "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/runtime/proc" "github.com/containerd/fifo" runc "github.com/containerd/go-runc" @@ -41,7 +42,7 @@ import ( type execProcess struct { wg sync.WaitGroup - proc.State + execState execState mu sync.Mutex id string @@ -49,7 +50,7 @@ type execProcess struct { io runc.IO status int exited time.Time - pid int + pid safePid closers []io.Closer stdin io.Closer stdio proc.Stdio @@ -69,9 +70,7 @@ func (e *execProcess) ID() string { } func (e *execProcess) Pid() int { - e.mu.Lock() - defer e.mu.Unlock() - return e.pid + return e.pid.get() } func (e *execProcess) ExitStatus() int { @@ -86,6 +85,13 @@ func (e *execProcess) ExitedAt() time.Time { return e.exited } +func (e *execProcess) SetExited(status int) { + e.mu.Lock() + defer e.mu.Unlock() + + e.execState.SetExited(status) +} + func (e *execProcess) setExited(status int) { e.status = status e.exited = time.Now() @@ -93,6 +99,13 @@ func (e *execProcess) setExited(status int) { close(e.waitBlock) } +func (e *execProcess) Delete(ctx context.Context) error { + e.mu.Lock() + defer e.mu.Unlock() + + return e.execState.Delete(ctx) +} + func (e *execProcess) delete(ctx context.Context) error { waitTimeout(ctx, &e.wg, 2*time.Second) if e.io != nil { @@ -107,6 +120,13 @@ func (e *execProcess) delete(ctx context.Context) error { return nil } +func (e *execProcess) Resize(ws console.WinSize) error { + e.mu.Lock() + defer e.mu.Unlock() + + return e.execState.Resize(ws) +} + func (e *execProcess) resize(ws console.WinSize) error { if e.console == nil { return nil @@ -114,9 +134,21 @@ func (e *execProcess) resize(ws console.WinSize) error { return e.console.Resize(ws) } +func (e *execProcess) Kill(ctx context.Context, sig uint32, _ bool) error { + e.mu.Lock() + defer e.mu.Unlock() + + return e.execState.Kill(ctx, sig, false) +} + func (e *execProcess) kill(ctx context.Context, sig uint32, _ bool) error { - pid := e.pid - if pid != 0 { + pid := e.pid.get() + switch { + case pid == 0: + return errors.Wrap(errdefs.ErrFailedPrecondition, "process not created") + case !e.exited.IsZero(): + return errors.Wrapf(errdefs.ErrNotFound, "process already finished") + default: if err := unix.Kill(pid, syscall.Signal(sig)); err != nil { return errors.Wrapf(checkKillError(err), "exec kill error") } @@ -132,7 +164,20 @@ func (e *execProcess) Stdio() proc.Stdio { return e.stdio } +func (e *execProcess) Start(ctx context.Context) error { + e.mu.Lock() + defer e.mu.Unlock() + + return e.execState.Start(ctx) +} + func (e *execProcess) start(ctx context.Context) (err error) { + // The reaper may receive exit signal right after + // the container is started, before the e.pid is updated. + // In that case, we want to block the signal handler to + // access e.pid until it is updated. + e.pid.Lock() + defer e.pid.Unlock() var ( socket *runc.Socket pidfile = filepath.Join(e.path, fmt.Sprintf("%s.pid", e.id)) @@ -164,7 +209,7 @@ func (e *execProcess) start(ctx context.Context) (err error) { return e.parent.runtimeError(err, "OCI runtime exec failed") } if e.stdio.Stdin != "" { - sc, err := fifo.OpenFifo(ctx, e.stdio.Stdin, syscall.O_WRONLY|syscall.O_NONBLOCK, 0) + sc, err := fifo.OpenFifo(context.Background(), e.stdio.Stdin, syscall.O_WRONLY|syscall.O_NONBLOCK, 0) if err != nil { return errors.Wrapf(err, "failed to open stdin fifo %s", e.stdio.Stdin) } @@ -173,29 +218,26 @@ func (e *execProcess) start(ctx context.Context) (err error) { } var copyWaitGroup sync.WaitGroup ctx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() if socket != nil { console, err := socket.ReceiveMaster() if err != nil { - cancel() return errors.Wrap(err, "failed to retrieve console master") } if e.console, err = e.parent.Platform.CopyConsole(ctx, console, e.stdio.Stdin, e.stdio.Stdout, e.stdio.Stderr, &e.wg, ©WaitGroup); err != nil { - cancel() return errors.Wrap(err, "failed to start console copy") } } else if !e.stdio.IsNull() { if err := copyPipes(ctx, e.io, e.stdio.Stdin, e.stdio.Stdout, e.stdio.Stderr, &e.wg, ©WaitGroup); err != nil { - cancel() return errors.Wrap(err, "failed to start io pipe copy") } } copyWaitGroup.Wait() pid, err := runc.ReadPidFile(opts.PidFile) if err != nil { - cancel() return errors.Wrap(err, "failed to retrieve OCI runtime exec pid") } - e.pid = pid + e.pid.pid = pid return nil } @@ -212,12 +254,15 @@ func (e *execProcess) Status(ctx context.Context) (string, error) { } e.mu.Lock() defer e.mu.Unlock() - // if we don't have a pid then the exec process has just been created - if e.pid == 0 { + // if we don't have a pid(pid=0) then the exec process has just been created + if e.pid.get() == 0 { return "created", nil } + if e.pid.get() == -1 { + return "stopped", nil + } // if we have a pid and it can be signaled, the process is running - if err := unix.Kill(e.pid, 0); err == nil { + if err := unix.Kill(e.pid.get(), 0); err == nil { return "running", nil } // else if we have a pid but it can nolonger be signaled, it has stopped diff --git a/runtime/v1/linux/proc/exec_state.go b/runtime/v1/linux/proc/exec_state.go index ac5467552..12489501b 100644 --- a/runtime/v1/linux/proc/exec_state.go +++ b/runtime/v1/linux/proc/exec_state.go @@ -25,6 +25,14 @@ import ( "github.com/pkg/errors" ) +type execState interface { + Resize(console.WinSize) error + Start(context.Context) error + Delete(context.Context) error + Kill(context.Context, uint32, bool) error + SetExited(int) +} + type execCreatedState struct { p *execProcess } @@ -32,11 +40,11 @@ type execCreatedState struct { func (s *execCreatedState) transition(name string) error { switch name { case "running": - s.p.State = &execRunningState{p: s.p} + s.p.execState = &execRunningState{p: s.p} case "stopped": - s.p.State = &execStoppedState{p: s.p} + s.p.execState = &execStoppedState{p: s.p} case "deleted": - s.p.State = &deletedState{} + s.p.execState = &deletedState{} default: return errors.Errorf("invalid state transition %q to %q", stateName(s), name) } @@ -44,15 +52,10 @@ func (s *execCreatedState) transition(name string) error { } func (s *execCreatedState) Resize(ws console.WinSize) error { - s.p.mu.Lock() - defer s.p.mu.Unlock() - return s.p.resize(ws) } func (s *execCreatedState) Start(ctx context.Context) error { - s.p.mu.Lock() - defer s.p.mu.Unlock() if err := s.p.start(ctx); err != nil { return err } @@ -63,22 +66,15 @@ func (s *execCreatedState) Delete(ctx context.Context) error { if err := s.p.delete(ctx); err != nil { return err } - s.p.mu.Lock() - defer s.p.mu.Unlock() + return s.transition("deleted") } func (s *execCreatedState) Kill(ctx context.Context, sig uint32, all bool) error { - s.p.mu.Lock() - defer s.p.mu.Unlock() - return s.p.kill(ctx, sig, all) } func (s *execCreatedState) SetExited(status int) { - s.p.mu.Lock() - defer s.p.mu.Unlock() - s.p.setExited(status) if err := s.transition("stopped"); err != nil { @@ -93,7 +89,7 @@ type execRunningState struct { func (s *execRunningState) transition(name string) error { switch name { case "stopped": - s.p.State = &execStoppedState{p: s.p} + s.p.execState = &execStoppedState{p: s.p} default: return errors.Errorf("invalid state transition %q to %q", stateName(s), name) } @@ -101,37 +97,22 @@ func (s *execRunningState) transition(name string) error { } func (s *execRunningState) Resize(ws console.WinSize) error { - s.p.mu.Lock() - defer s.p.mu.Unlock() - return s.p.resize(ws) } func (s *execRunningState) Start(ctx context.Context) error { - s.p.mu.Lock() - defer s.p.mu.Unlock() - return errors.Errorf("cannot start a running process") } func (s *execRunningState) Delete(ctx context.Context) error { - s.p.mu.Lock() - defer s.p.mu.Unlock() - return errors.Errorf("cannot delete a running process") } func (s *execRunningState) Kill(ctx context.Context, sig uint32, all bool) error { - s.p.mu.Lock() - defer s.p.mu.Unlock() - return s.p.kill(ctx, sig, all) } func (s *execRunningState) SetExited(status int) { - s.p.mu.Lock() - defer s.p.mu.Unlock() - s.p.setExited(status) if err := s.transition("stopped"); err != nil { @@ -146,7 +127,7 @@ type execStoppedState struct { func (s *execStoppedState) transition(name string) error { switch name { case "deleted": - s.p.State = &deletedState{} + s.p.execState = &deletedState{} default: return errors.Errorf("invalid state transition %q to %q", stateName(s), name) } @@ -154,16 +135,10 @@ func (s *execStoppedState) transition(name string) error { } func (s *execStoppedState) Resize(ws console.WinSize) error { - s.p.mu.Lock() - defer s.p.mu.Unlock() - return errors.Errorf("cannot resize a stopped container") } func (s *execStoppedState) Start(ctx context.Context) error { - s.p.mu.Lock() - defer s.p.mu.Unlock() - return errors.Errorf("cannot start a stopped process") } @@ -171,15 +146,11 @@ func (s *execStoppedState) Delete(ctx context.Context) error { if err := s.p.delete(ctx); err != nil { return err } - s.p.mu.Lock() - defer s.p.mu.Unlock() + return s.transition("deleted") } func (s *execStoppedState) Kill(ctx context.Context, sig uint32, all bool) error { - s.p.mu.Lock() - defer s.p.mu.Unlock() - return s.p.kill(ctx, sig, all) } diff --git a/runtime/v1/linux/proc/init.go b/runtime/v1/linux/proc/init.go index 669c1085d..108234904 100644 --- a/runtime/v1/linux/proc/init.go +++ b/runtime/v1/linux/proc/init.go @@ -52,8 +52,8 @@ const DefaultRunvRoot = "/run/runv" // Init represents an initial process for a container type Init struct { - wg sync.WaitGroup - initState + wg sync.WaitGroup + initState initState // mu is used to ensure that `Start()` and `Exited()` calls return in // the right order when invoked in separate go routines. @@ -65,12 +65,12 @@ type Init struct { WorkDir string - id string - Bundle string - console console.Console - Platform proc.Platform - io runc.IO - runtime *runc.Runc + id string + Bundle string + console console.Console + Platform proc.Platform + io runc.IO + runtime *runc.Runc status int exited time.Time pid int @@ -138,6 +138,7 @@ func (p *Init) Create(ctx context.Context, r *CreateConfig) error { err error socket *runc.Socket ) + pidFile := filepath.Join(p.Bundle, InitPidFile) if legacy.IsLegacy(r.ID) { @@ -195,7 +196,7 @@ func (p *Init) Create(ctx context.Context, r *CreateConfig) error { return p.runtimeError(err, "OCI runtime create failed") } if r.Stdin != "" { - sc, err := fifo.OpenFifo(ctx, r.Stdin, syscall.O_WRONLY|syscall.O_NONBLOCK, 0) + sc, err := fifo.OpenFifo(context.Background(), r.Stdin, syscall.O_WRONLY|syscall.O_NONBLOCK, 0) if err != nil { return errors.Wrapf(err, "failed to open stdin fifo %s", r.Stdin) } @@ -204,21 +205,19 @@ func (p *Init) Create(ctx context.Context, r *CreateConfig) error { } var copyWaitGroup sync.WaitGroup ctx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() if socket != nil { console, err := socket.ReceiveMaster() if err != nil { - cancel() return errors.Wrap(err, "failed to retrieve console master") } console, err = p.Platform.CopyConsole(ctx, console, r.Stdin, r.Stdout, r.Stderr, &p.wg, ©WaitGroup) if err != nil { - cancel() return errors.Wrap(err, "failed to start console copy") } p.console = console } else if !hasNoIO(r) { if err := copyPipes(ctx, p.io, r.Stdin, r.Stdout, r.Stderr, &p.wg, ©WaitGroup); err != nil { - cancel() return errors.Wrap(err, "failed to start io pipe copy") } } @@ -226,7 +225,6 @@ func (p *Init) Create(ctx context.Context, r *CreateConfig) error { copyWaitGroup.Wait() pid, err := runc.ReadPidFile(pidFile) if err != nil { - cancel() return errors.Wrap(err, "failed to retrieve OCI runtime container pid") } p.pid = pid @@ -266,6 +264,7 @@ func (p *Init) ExitedAt() time.Time { func (p *Init) Status(ctx context.Context) (string, error) { p.mu.Lock() defer p.mu.Unlock() + c, err := p.runtime.State(ctx, p.id) if err != nil { if strings.Contains(err.Error(), "does not exist") { @@ -276,14 +275,30 @@ func (p *Init) Status(ctx context.Context) (string, error) { return c.Status, nil } -func (p *Init) start(context context.Context) error { - err := p.runtime.Start(context, p.id) +// Start the init process +func (p *Init) Start(ctx context.Context) error { + p.mu.Lock() + defer p.mu.Unlock() + + return p.initState.Start(ctx) +} + +func (p *Init) start(ctx context.Context) error { + err := p.runtime.Start(ctx, p.id) if err != nil { utils.KillInitProcess(p.id, p.pid) } return p.runtimeError(err, "OCI runtime start failed") } +// SetExited of the init process with the next status +func (p *Init) SetExited(status int) { + p.mu.Lock() + defer p.mu.Unlock() + + p.initState.SetExited(status) +} + func (p *Init) setExited(status int) { p.exited = time.Now() p.status = status @@ -291,9 +306,17 @@ func (p *Init) setExited(status int) { close(p.waitBlock) } -func (p *Init) delete(context context.Context) error { - waitTimeout(context, &p.wg, 2*time.Second) - err := p.runtime.Delete(context, p.id, nil) +// Delete the init process +func (p *Init) Delete(ctx context.Context) error { + p.mu.Lock() + defer p.mu.Unlock() + + return p.initState.Delete(ctx) +} + +func (p *Init) delete(ctx context.Context) error { + waitTimeout(ctx, &p.wg, 2*time.Second) + err := p.runtime.Delete(ctx, p.id, nil) // ignore errors if a runtime has already deleted the process // but we still hold metadata and pipes // @@ -312,15 +335,28 @@ func (p *Init) delete(context context.Context) error { } p.io.Close() } - if err2 := mount.UnmountAll(p.Rootfs, 0); err2 != nil { - log.G(context).WithError(err2).Warn("failed to cleanup rootfs mount") - if err == nil { - err = errors.Wrap(err2, "failed rootfs umount") + if p.Rootfs != "" { + if err2 := mount.UnmountAll(p.Rootfs, 0); err2 != nil { + log.G(ctx).WithError(err2).Warn("failed to cleanup rootfs mount") + if err == nil { + err = errors.Wrap(err2, "failed rootfs umount") + } } } return err } +// Resize the init processes console +func (p *Init) Resize(ws console.WinSize) error { + p.mu.Lock() + defer p.mu.Unlock() + + if p.console == nil { + return nil + } + return p.console.Resize(ws) +} + func (p *Init) resize(ws console.WinSize) error { if p.console == nil { return nil @@ -328,26 +364,43 @@ func (p *Init) resize(ws console.WinSize) error { return p.console.Resize(ws) } -func (p *Init) pause(context context.Context) error { - err := p.runtime.Pause(context, p.id) - return p.runtimeError(err, "OCI runtime pause failed") +// Pause the init process and all its child processes +func (p *Init) Pause(ctx context.Context) error { + p.mu.Lock() + defer p.mu.Unlock() + + return p.initState.Pause(ctx) +} + +// Resume the init process and all its child processes +func (p *Init) Resume(ctx context.Context) error { + p.mu.Lock() + defer p.mu.Unlock() + + return p.initState.Resume(ctx) } -func (p *Init) resume(context context.Context) error { - err := p.runtime.Resume(context, p.id) - return p.runtimeError(err, "OCI runtime resume failed") +// Kill the init process +func (p *Init) Kill(ctx context.Context, signal uint32, all bool) error { + p.mu.Lock() + defer p.mu.Unlock() + + return p.initState.Kill(ctx, signal, all) } -func (p *Init) kill(context context.Context, signal uint32, all bool) error { - err := p.runtime.Kill(context, p.id, int(signal), &runc.KillOpts{ +func (p *Init) kill(ctx context.Context, signal uint32, all bool) error { + err := p.runtime.Kill(ctx, p.id, int(signal), &runc.KillOpts{ All: all, }) return checkKillError(err) } // KillAll processes belonging to the init process -func (p *Init) KillAll(context context.Context) error { - err := p.runtime.Kill(context, p.id, int(syscall.SIGKILL), &runc.KillOpts{ +func (p *Init) KillAll(ctx context.Context) error { + p.mu.Lock() + defer p.mu.Unlock() + + err := p.runtime.Kill(ctx, p.id, int(syscall.SIGKILL), &runc.KillOpts{ All: true, }) return p.runtimeError(err, "OCI runtime killall failed") @@ -363,8 +416,16 @@ func (p *Init) Runtime() *runc.Runc { return p.runtime } +// Exec returns a new child process +func (p *Init) Exec(ctx context.Context, path string, r *ExecConfig) (proc.Process, error) { + p.mu.Lock() + defer p.mu.Unlock() + + return p.initState.Exec(ctx, path, r) +} + // exec returns a new exec'd process -func (p *Init) exec(context context.Context, path string, r *ExecConfig) (proc.Process, error) { +func (p *Init) exec(ctx context.Context, path string, r *ExecConfig) (proc.Process, error) { // process exec request var spec specs.Process if err := json.Unmarshal(r.Spec.Value, &spec); err != nil { @@ -385,18 +446,26 @@ func (p *Init) exec(context context.Context, path string, r *ExecConfig) (proc.P }, waitBlock: make(chan struct{}), } - e.State = &execCreatedState{p: e} + e.execState = &execCreatedState{p: e} return e, nil } -func (p *Init) checkpoint(context context.Context, r *CheckpointConfig) error { +// Checkpoint the init process +func (p *Init) Checkpoint(ctx context.Context, r *CheckpointConfig) error { + p.mu.Lock() + defer p.mu.Unlock() + + return p.initState.Checkpoint(ctx, r) +} + +func (p *Init) checkpoint(ctx context.Context, r *CheckpointConfig) error { var actions []runc.CheckpointAction if !r.Exit { actions = append(actions, runc.LeaveRunning) } work := filepath.Join(p.WorkDir, "criu-work") defer os.RemoveAll(work) - if err := p.runtime.Checkpoint(context, p.id, &runc.CheckpointOpts{ + if err := p.runtime.Checkpoint(ctx, p.id, &runc.CheckpointOpts{ WorkDir: work, ImagePath: r.Path, AllowOpenTCP: r.AllowOpenTCP, @@ -407,19 +476,27 @@ func (p *Init) checkpoint(context context.Context, r *CheckpointConfig) error { }, actions...); err != nil { dumpLog := filepath.Join(p.Bundle, "criu-dump.log") if cerr := copyFile(dumpLog, filepath.Join(work, "dump.log")); cerr != nil { - log.G(context).Error(err) + log.G(ctx).Error(err) } return fmt.Errorf("%s path= %s", criuError(err), dumpLog) } return nil } -func (p *Init) update(context context.Context, r *google_protobuf.Any) error { +// Update the processes resource configuration +func (p *Init) Update(ctx context.Context, r *google_protobuf.Any) error { + p.mu.Lock() + defer p.mu.Unlock() + + return p.initState.Update(ctx, r) +} + +func (p *Init) update(ctx context.Context, r *google_protobuf.Any) error { var resources specs.LinuxResources if err := json.Unmarshal(r.Value, &resources); err != nil { return err } - return p.runtime.Update(context, p.id, &resources) + return p.runtime.Update(ctx, p.id, &resources) } // Stdio of the process diff --git a/runtime/v1/linux/proc/init_state.go b/runtime/v1/linux/proc/init_state.go index 6a6b448d3..e83934e9c 100644 --- a/runtime/v1/linux/proc/init_state.go +++ b/runtime/v1/linux/proc/init_state.go @@ -30,16 +30,20 @@ import ( runc "github.com/containerd/go-runc" google_protobuf "github.com/gogo/protobuf/types" "github.com/pkg/errors" + "github.com/sirupsen/logrus" ) type initState interface { - proc.State - + Resize(console.WinSize) error + Start(context.Context) error + Delete(context.Context) error Pause(context.Context) error Resume(context.Context) error Update(context.Context, *google_protobuf.Any) error Checkpoint(context.Context, *CheckpointConfig) error Exec(context.Context, string, *ExecConfig) (proc.Process, error) + Kill(context.Context, uint32, bool) error + SetExited(int) } type createdState struct { @@ -61,43 +65,26 @@ func (s *createdState) transition(name string) error { } func (s *createdState) Pause(ctx context.Context) error { - s.p.mu.Lock() - defer s.p.mu.Unlock() - return errors.Errorf("cannot pause task in created state") } func (s *createdState) Resume(ctx context.Context) error { - s.p.mu.Lock() - defer s.p.mu.Unlock() - return errors.Errorf("cannot resume task in created state") } -func (s *createdState) Update(context context.Context, r *google_protobuf.Any) error { - s.p.mu.Lock() - defer s.p.mu.Unlock() - - return s.p.update(context, r) +func (s *createdState) Update(ctx context.Context, r *google_protobuf.Any) error { + return s.p.update(ctx, r) } -func (s *createdState) Checkpoint(context context.Context, r *CheckpointConfig) error { - s.p.mu.Lock() - defer s.p.mu.Unlock() - +func (s *createdState) Checkpoint(ctx context.Context, r *CheckpointConfig) error { return errors.Errorf("cannot checkpoint a task in created state") } func (s *createdState) Resize(ws console.WinSize) error { - s.p.mu.Lock() - defer s.p.mu.Unlock() - return s.p.resize(ws) } func (s *createdState) Start(ctx context.Context) error { - s.p.mu.Lock() - defer s.p.mu.Unlock() if err := s.p.start(ctx); err != nil { return err } @@ -105,8 +92,6 @@ func (s *createdState) Start(ctx context.Context) error { } func (s *createdState) Delete(ctx context.Context) error { - s.p.mu.Lock() - defer s.p.mu.Unlock() if err := s.p.delete(ctx); err != nil { return err } @@ -114,16 +99,10 @@ func (s *createdState) Delete(ctx context.Context) error { } func (s *createdState) Kill(ctx context.Context, sig uint32, all bool) error { - s.p.mu.Lock() - defer s.p.mu.Unlock() - return s.p.kill(ctx, sig, all) } func (s *createdState) SetExited(status int) { - s.p.mu.Lock() - defer s.p.mu.Unlock() - s.p.setExited(status) if err := s.transition("stopped"); err != nil { @@ -132,8 +111,6 @@ func (s *createdState) SetExited(status int) { } func (s *createdState) Exec(ctx context.Context, path string, r *ExecConfig) (proc.Process, error) { - s.p.mu.Lock() - defer s.p.mu.Unlock() return s.p.exec(ctx, path, r) } @@ -157,43 +134,26 @@ func (s *createdCheckpointState) transition(name string) error { } func (s *createdCheckpointState) Pause(ctx context.Context) error { - s.p.mu.Lock() - defer s.p.mu.Unlock() - return errors.Errorf("cannot pause task in created state") } func (s *createdCheckpointState) Resume(ctx context.Context) error { - s.p.mu.Lock() - defer s.p.mu.Unlock() - return errors.Errorf("cannot resume task in created state") } -func (s *createdCheckpointState) Update(context context.Context, r *google_protobuf.Any) error { - s.p.mu.Lock() - defer s.p.mu.Unlock() - - return s.p.update(context, r) +func (s *createdCheckpointState) Update(ctx context.Context, r *google_protobuf.Any) error { + return s.p.update(ctx, r) } -func (s *createdCheckpointState) Checkpoint(context context.Context, r *CheckpointConfig) error { - s.p.mu.Lock() - defer s.p.mu.Unlock() - +func (s *createdCheckpointState) Checkpoint(ctx context.Context, r *CheckpointConfig) error { return errors.Errorf("cannot checkpoint a task in created state") } func (s *createdCheckpointState) Resize(ws console.WinSize) error { - s.p.mu.Lock() - defer s.p.mu.Unlock() - return s.p.resize(ws) } func (s *createdCheckpointState) Start(ctx context.Context) error { - s.p.mu.Lock() - defer s.p.mu.Unlock() p := s.p sio := p.stdio @@ -213,7 +173,7 @@ func (s *createdCheckpointState) Start(ctx context.Context) error { return p.runtimeError(err, "OCI runtime restore failed") } if sio.Stdin != "" { - sc, err := fifo.OpenFifo(ctx, sio.Stdin, syscall.O_WRONLY|syscall.O_NONBLOCK, 0) + sc, err := fifo.OpenFifo(context.Background(), sio.Stdin, syscall.O_WRONLY|syscall.O_NONBLOCK, 0) if err != nil { return errors.Wrapf(err, "failed to open stdin fifo %s", sio.Stdin) } @@ -247,8 +207,6 @@ func (s *createdCheckpointState) Start(ctx context.Context) error { } func (s *createdCheckpointState) Delete(ctx context.Context) error { - s.p.mu.Lock() - defer s.p.mu.Unlock() if err := s.p.delete(ctx); err != nil { return err } @@ -256,16 +214,10 @@ func (s *createdCheckpointState) Delete(ctx context.Context) error { } func (s *createdCheckpointState) Kill(ctx context.Context, sig uint32, all bool) error { - s.p.mu.Lock() - defer s.p.mu.Unlock() - return s.p.kill(ctx, sig, all) } func (s *createdCheckpointState) SetExited(status int) { - s.p.mu.Lock() - defer s.p.mu.Unlock() - s.p.setExited(status) if err := s.transition("stopped"); err != nil { @@ -274,9 +226,6 @@ func (s *createdCheckpointState) SetExited(status int) { } func (s *createdCheckpointState) Exec(ctx context.Context, path string, r *ExecConfig) (proc.Process, error) { - s.p.mu.Lock() - defer s.p.mu.Unlock() - return nil, errors.Errorf("cannot exec in a created state") } @@ -297,67 +246,42 @@ func (s *runningState) transition(name string) error { } func (s *runningState) Pause(ctx context.Context) error { - s.p.mu.Lock() - defer s.p.mu.Unlock() - if err := s.p.pause(ctx); err != nil { - return err + if err := s.p.runtime.Pause(ctx, s.p.id); err != nil { + return s.p.runtimeError(err, "OCI runtime pause failed") } + return s.transition("paused") } func (s *runningState) Resume(ctx context.Context) error { - s.p.mu.Lock() - defer s.p.mu.Unlock() - return errors.Errorf("cannot resume a running process") } -func (s *runningState) Update(context context.Context, r *google_protobuf.Any) error { - s.p.mu.Lock() - defer s.p.mu.Unlock() - - return s.p.update(context, r) +func (s *runningState) Update(ctx context.Context, r *google_protobuf.Any) error { + return s.p.update(ctx, r) } func (s *runningState) Checkpoint(ctx context.Context, r *CheckpointConfig) error { - s.p.mu.Lock() - defer s.p.mu.Unlock() - return s.p.checkpoint(ctx, r) } func (s *runningState) Resize(ws console.WinSize) error { - s.p.mu.Lock() - defer s.p.mu.Unlock() - return s.p.resize(ws) } func (s *runningState) Start(ctx context.Context) error { - s.p.mu.Lock() - defer s.p.mu.Unlock() - return errors.Errorf("cannot start a running process") } func (s *runningState) Delete(ctx context.Context) error { - s.p.mu.Lock() - defer s.p.mu.Unlock() - return errors.Errorf("cannot delete a running process") } func (s *runningState) Kill(ctx context.Context, sig uint32, all bool) error { - s.p.mu.Lock() - defer s.p.mu.Unlock() - return s.p.kill(ctx, sig, all) } func (s *runningState) SetExited(status int) { - s.p.mu.Lock() - defer s.p.mu.Unlock() - s.p.setExited(status) if err := s.transition("stopped"); err != nil { @@ -366,8 +290,6 @@ func (s *runningState) SetExited(status int) { } func (s *runningState) Exec(ctx context.Context, path string, r *ExecConfig) (proc.Process, error) { - s.p.mu.Lock() - defer s.p.mu.Unlock() return s.p.exec(ctx, path, r) } @@ -388,79 +310,54 @@ func (s *pausedState) transition(name string) error { } func (s *pausedState) Pause(ctx context.Context) error { - s.p.mu.Lock() - defer s.p.mu.Unlock() - return errors.Errorf("cannot pause a paused container") } func (s *pausedState) Resume(ctx context.Context) error { - s.p.mu.Lock() - defer s.p.mu.Unlock() - - if err := s.p.resume(ctx); err != nil { - return err + if err := s.p.runtime.Resume(ctx, s.p.id); err != nil { + return s.p.runtimeError(err, "OCI runtime resume failed") } + return s.transition("running") } -func (s *pausedState) Update(context context.Context, r *google_protobuf.Any) error { - s.p.mu.Lock() - defer s.p.mu.Unlock() - - return s.p.update(context, r) +func (s *pausedState) Update(ctx context.Context, r *google_protobuf.Any) error { + return s.p.update(ctx, r) } func (s *pausedState) Checkpoint(ctx context.Context, r *CheckpointConfig) error { - s.p.mu.Lock() - defer s.p.mu.Unlock() - return s.p.checkpoint(ctx, r) } func (s *pausedState) Resize(ws console.WinSize) error { - s.p.mu.Lock() - defer s.p.mu.Unlock() - return s.p.resize(ws) } func (s *pausedState) Start(ctx context.Context) error { - s.p.mu.Lock() - defer s.p.mu.Unlock() - return errors.Errorf("cannot start a paused process") } func (s *pausedState) Delete(ctx context.Context) error { - s.p.mu.Lock() - defer s.p.mu.Unlock() - return errors.Errorf("cannot delete a paused process") } func (s *pausedState) Kill(ctx context.Context, sig uint32, all bool) error { - s.p.mu.Lock() - defer s.p.mu.Unlock() - return s.p.kill(ctx, sig, all) } func (s *pausedState) SetExited(status int) { - s.p.mu.Lock() - defer s.p.mu.Unlock() - s.p.setExited(status) + if err := s.p.runtime.Resume(context.Background(), s.p.id); err != nil { + logrus.WithError(err).Error("resuming exited container from paused state") + } + if err := s.transition("stopped"); err != nil { panic(err) } } func (s *pausedState) Exec(ctx context.Context, path string, r *ExecConfig) (proc.Process, error) { - s.p.mu.Lock() - defer s.p.mu.Unlock() - return nil, errors.Errorf("cannot exec in a paused state") } @@ -479,50 +376,30 @@ func (s *stoppedState) transition(name string) error { } func (s *stoppedState) Pause(ctx context.Context) error { - s.p.mu.Lock() - defer s.p.mu.Unlock() - return errors.Errorf("cannot pause a stopped container") } func (s *stoppedState) Resume(ctx context.Context) error { - s.p.mu.Lock() - defer s.p.mu.Unlock() - return errors.Errorf("cannot resume a stopped container") } -func (s *stoppedState) Update(context context.Context, r *google_protobuf.Any) error { - s.p.mu.Lock() - defer s.p.mu.Unlock() - +func (s *stoppedState) Update(ctx context.Context, r *google_protobuf.Any) error { return errors.Errorf("cannot update a stopped container") } func (s *stoppedState) Checkpoint(ctx context.Context, r *CheckpointConfig) error { - s.p.mu.Lock() - defer s.p.mu.Unlock() - return errors.Errorf("cannot checkpoint a stopped container") } func (s *stoppedState) Resize(ws console.WinSize) error { - s.p.mu.Lock() - defer s.p.mu.Unlock() - return errors.Errorf("cannot resize a stopped container") } func (s *stoppedState) Start(ctx context.Context) error { - s.p.mu.Lock() - defer s.p.mu.Unlock() - return errors.Errorf("cannot start a stopped process") } func (s *stoppedState) Delete(ctx context.Context) error { - s.p.mu.Lock() - defer s.p.mu.Unlock() if err := s.p.delete(ctx); err != nil { return err } @@ -538,8 +415,5 @@ func (s *stoppedState) SetExited(status int) { } func (s *stoppedState) Exec(ctx context.Context, path string, r *ExecConfig) (proc.Process, error) { - s.p.mu.Lock() - defer s.p.mu.Unlock() - return nil, errors.Errorf("cannot exec in a stopped state") } diff --git a/runtime/v1/linux/proc/io.go b/runtime/v1/linux/proc/io.go index 360662701..e620f5840 100644 --- a/runtime/v1/linux/proc/io.go +++ b/runtime/v1/linux/proc/io.go @@ -114,7 +114,7 @@ func copyPipes(ctx context.Context, rio runc.IO, stdin, stdout, stderr string, w if stdin == "" { return nil } - f, err := fifo.OpenFifo(ctx, stdin, syscall.O_RDONLY|syscall.O_NONBLOCK, 0) + f, err := fifo.OpenFifo(context.Background(), stdin, syscall.O_RDONLY|syscall.O_NONBLOCK, 0) if err != nil { return fmt.Errorf("containerd-shim syscall.O_RDONLY|syscall.O_NONBLOCK: opening %s failed: %s", stdin, err) } diff --git a/runtime/v1/linux/proc/utils.go b/runtime/v1/linux/proc/utils.go index d6f047cee..7312dec68 100644 --- a/runtime/v1/linux/proc/utils.go +++ b/runtime/v1/linux/proc/utils.go @@ -33,6 +33,18 @@ import ( "golang.org/x/sys/unix" ) +// safePid is a thread safe wrapper for pid. +type safePid struct { + sync.Mutex + pid int +} + +func (s *safePid) get() int { + s.Lock() + defer s.Unlock() + return s.pid +} + // TODO(mlaventure): move to runc package? func getLastRuntimeError(r *runc.Runc) (string, error) { if r.Log == "" { diff --git a/runtime/v2/runc/service_linux.go b/runtime/v2/runc/service_linux.go index 116167352..195c23014 100644 --- a/runtime/v2/runc/service_linux.go +++ b/runtime/v2/runc/service_linux.go @@ -42,7 +42,7 @@ func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console } if stdin != "" { - in, err := fifo.OpenFifo(ctx, stdin, syscall.O_RDONLY|syscall.O_NONBLOCK, 0) + in, err := fifo.OpenFifo(context.Background(), stdin, syscall.O_RDONLY|syscall.O_NONBLOCK, 0) if err != nil { return nil, err } diff --git a/runtime/v2/shim_unix.go b/runtime/v2/shim_unix.go index 1a08be5d1..6738a7787 100644 --- a/runtime/v2/shim_unix.go +++ b/runtime/v2/shim_unix.go @@ -28,5 +28,5 @@ import ( ) func openShimLog(ctx context.Context, bundle *Bundle) (io.ReadCloser, error) { - return fifo.OpenFifo(ctx, filepath.Join(bundle.Path, "log"), unix.O_RDONLY|unix.O_CREAT|unix.O_NONBLOCK, 0700) + return fifo.OpenFifo(ctx, filepath.Join(bundle.Path, "log"), unix.O_RDWR|unix.O_CREAT, 0700) } -- 2.17.1