2024-04-26 16:27:21 +08:00
|
|
|
From 5ad72e2b135afb5fbe5112f901bdec79f8943611 Mon Sep 17 00:00:00 2001
|
|
|
|
|
From: liuxu <liuxu156@huawei.com>
|
|
|
|
|
Date: Fri, 26 Apr 2024 10:49:14 +0800
|
2023-07-21 10:41:06 +08:00
|
|
|
Subject: [PATCH] kubelet support exec and attach websocket protocol
|
2021-02-03 15:04:00 +08:00
|
|
|
|
2024-04-26 16:27:21 +08:00
|
|
|
Signed-off-by: liuxu <liuxu156@huawei.com>
|
2021-02-03 15:04:00 +08:00
|
|
|
---
|
2023-01-03 15:05:23 +08:00
|
|
|
pkg/kubelet/server/server.go | 43 +++-
|
2024-04-26 16:27:21 +08:00
|
|
|
.../pkg/cri/streaming/remotecommand/proxy.go | 212 ++++++++++++++++++
|
2023-07-21 10:41:06 +08:00
|
|
|
2 files changed, 247 insertions(+), 8 deletions(-)
|
2024-04-26 16:27:21 +08:00
|
|
|
create mode 100644 staging/src/k8s.io/kubelet/pkg/cri/streaming/remotecommand/proxy.go
|
2021-02-03 15:04:00 +08:00
|
|
|
|
2024-04-26 16:27:21 +08:00
|
|
|
diff --git a/pkg/kubelet/server/server.go b/pkg/kubelet/server/server.go
|
|
|
|
|
index 87a017f9..887e663a 100644
|
|
|
|
|
--- a/pkg/kubelet/server/server.go
|
|
|
|
|
+++ b/pkg/kubelet/server/server.go
|
|
|
|
|
@@ -834,51 +834,78 @@ func proxyStream(w http.ResponseWriter, r *http.Request, url *url.URL) {
|
|
|
|
|
|
|
|
|
|
// getAttach handles requests to attach to a container.
|
|
|
|
|
func (s *Server) getAttach(request *restful.Request, response *restful.Response) {
|
|
|
|
|
- params := getExecRequestParams(request)
|
|
|
|
|
streamOpts, err := remotecommandserver.NewOptions(request.Request)
|
|
|
|
|
if err != nil {
|
|
|
|
|
utilruntime.HandleError(err)
|
|
|
|
|
response.WriteError(http.StatusBadRequest, err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
+
|
|
|
|
|
+ url, err := s.getAttachUrl(request, response, streamOpts)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ klog.Errorf("failed to get backend url %v", err)
|
|
|
|
|
+ return
|
|
|
|
|
+ }
|
|
|
|
|
+ if url.Scheme == "ws" || url.Scheme == "wss" {
|
|
|
|
|
+ remotecommandserver.ProxyToWebSocket(response.ResponseWriter, request.Request, url, streamOpts)
|
|
|
|
|
+ } else {
|
|
|
|
|
+ proxyStream(response.ResponseWriter, request.Request, url)
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (s *Server) getAttachUrl(request *restful.Request, response *restful.Response, streamOpts *remotecommandserver.Options) (*url.URL, error) {
|
|
|
|
|
+ params := getExecRequestParams(request)
|
|
|
|
|
pod, ok := s.host.GetPodByName(params.podNamespace, params.podName)
|
|
|
|
|
if !ok {
|
|
|
|
|
response.WriteError(http.StatusNotFound, fmt.Errorf("pod does not exist"))
|
|
|
|
|
- return
|
|
|
|
|
+ return nil, fmt.Errorf("pod not found")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
podFullName := kubecontainer.GetPodFullName(pod)
|
|
|
|
|
url, err := s.host.GetAttach(request.Request.Context(), podFullName, params.podUID, params.containerName, *streamOpts)
|
|
|
|
|
if err != nil {
|
|
|
|
|
streaming.WriteError(err, response.ResponseWriter)
|
|
|
|
|
- return
|
|
|
|
|
+ return nil, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
- proxyStream(response.ResponseWriter, request.Request, url)
|
|
|
|
|
+ return url, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// getExec handles requests to run a command inside a container.
|
|
|
|
|
func (s *Server) getExec(request *restful.Request, response *restful.Response) {
|
|
|
|
|
- params := getExecRequestParams(request)
|
|
|
|
|
streamOpts, err := remotecommandserver.NewOptions(request.Request)
|
|
|
|
|
if err != nil {
|
|
|
|
|
utilruntime.HandleError(err)
|
|
|
|
|
response.WriteError(http.StatusBadRequest, err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
+ url, err := s.getExecUrl(request, response, streamOpts)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ klog.Errorf("failed to get backend url %v", err)
|
|
|
|
|
+ return
|
|
|
|
|
+ }
|
|
|
|
|
+ if url.Scheme == "ws" || url.Scheme == "wss" {
|
|
|
|
|
+ remotecommandserver.ProxyToWebSocket(response.ResponseWriter, request.Request, url, streamOpts)
|
|
|
|
|
+ } else {
|
|
|
|
|
+ proxyStream(response.ResponseWriter, request.Request, url)
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (s *Server) getExecUrl(request *restful.Request, response *restful.Response, streamOpts *remotecommandserver.Options) (*url.URL, error) {
|
|
|
|
|
+ params := getExecRequestParams(request)
|
|
|
|
|
pod, ok := s.host.GetPodByName(params.podNamespace, params.podName)
|
|
|
|
|
if !ok {
|
|
|
|
|
response.WriteError(http.StatusNotFound, fmt.Errorf("pod does not exist"))
|
|
|
|
|
- return
|
|
|
|
|
+ return nil, fmt.Errorf("pod not found")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
podFullName := kubecontainer.GetPodFullName(pod)
|
|
|
|
|
url, err := s.host.GetExec(request.Request.Context(), podFullName, params.podUID, params.containerName, params.cmd, *streamOpts)
|
|
|
|
|
if err != nil {
|
|
|
|
|
streaming.WriteError(err, response.ResponseWriter)
|
|
|
|
|
- return
|
|
|
|
|
+ return nil, err
|
|
|
|
|
}
|
|
|
|
|
- proxyStream(response.ResponseWriter, request.Request, url)
|
|
|
|
|
+ return url, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// getRun handles requests to run a command inside a container.
|
|
|
|
|
diff --git a/staging/src/k8s.io/kubelet/pkg/cri/streaming/remotecommand/proxy.go b/staging/src/k8s.io/kubelet/pkg/cri/streaming/remotecommand/proxy.go
|
2021-02-03 15:04:00 +08:00
|
|
|
new file mode 100644
|
2023-07-21 10:41:06 +08:00
|
|
|
index 00000000..179d8183
|
2021-02-03 15:04:00 +08:00
|
|
|
--- /dev/null
|
2024-04-26 16:27:21 +08:00
|
|
|
+++ b/staging/src/k8s.io/kubelet/pkg/cri/streaming/remotecommand/proxy.go
|
2023-07-21 10:41:06 +08:00
|
|
|
@@ -0,0 +1,212 @@
|
2021-02-03 15:04:00 +08:00
|
|
|
+package remotecommand
|
|
|
|
|
+
|
|
|
|
|
+import (
|
|
|
|
|
+ "bytes"
|
|
|
|
|
+ "errors"
|
|
|
|
|
+ "fmt"
|
|
|
|
|
+ "io"
|
|
|
|
|
+ "net/http"
|
|
|
|
|
+ "net/url"
|
|
|
|
|
+ "strings"
|
|
|
|
|
+ "time"
|
|
|
|
|
+
|
|
|
|
|
+ "github.com/gorilla/websocket"
|
|
|
|
|
+ apierrors "k8s.io/apimachinery/pkg/api/errors"
|
|
|
|
|
+ "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
|
|
|
+ remotecommandconsts "k8s.io/apimachinery/pkg/util/remotecommand"
|
|
|
|
|
+ "k8s.io/apimachinery/pkg/util/runtime"
|
2023-07-21 10:41:06 +08:00
|
|
|
+ "k8s.io/client-go/util/exec"
|
2021-02-03 15:04:00 +08:00
|
|
|
+ "k8s.io/klog/v2"
|
|
|
|
|
+)
|
|
|
|
|
+
|
|
|
|
|
+var (
|
|
|
|
|
+ streamIdleTimeout = 4 * time.Hour
|
|
|
|
|
+ streamCreationTimeout = remotecommandconsts.DefaultStreamCreationTimeout
|
|
|
|
|
+)
|
|
|
|
|
+
|
|
|
|
|
+// proxyStreamToWebSocket proxies stream to url with websocket.
|
|
|
|
|
+func ProxyToWebSocket(w http.ResponseWriter, r *http.Request, url *url.URL, opts *Options) {
|
|
|
|
|
+ klog.V(8).Infof("start proxy request to websocket %+v", r)
|
|
|
|
|
+ ctx, ok := createStreams(
|
|
|
|
|
+ r,
|
|
|
|
|
+ w,
|
|
|
|
|
+ opts,
|
|
|
|
|
+ remotecommandconsts.SupportedStreamingProtocols,
|
|
|
|
|
+ streamIdleTimeout,
|
|
|
|
|
+ streamCreationTimeout)
|
|
|
|
|
+ if !ok {
|
|
|
|
|
+ msg := "failed to create stream to fontend"
|
|
|
|
|
+ klog.Error(msg)
|
|
|
|
|
+ http.Error(w, msg, http.StatusInternalServerError)
|
|
|
|
|
+ return
|
|
|
|
|
+ }
|
|
|
|
|
+ defer func() {
|
|
|
|
|
+ if err := ctx.conn.Close(); err != nil {
|
|
|
|
|
+ klog.Errorf("failed to close connection, %v", err)
|
|
|
|
|
+ }
|
|
|
|
|
+ }()
|
|
|
|
|
+
|
|
|
|
|
+ klog.V(8).Infof("start connecting to websocket %s", url.String())
|
|
|
|
|
+ backendConn, err := connectBackend(url.String(), "channel.k8s.io", r)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ msg := fmt.Sprintf("connectBackend failed: %v", err)
|
|
|
|
|
+ klog.Error(msg)
|
|
|
|
|
+ http.Error(w, msg, http.StatusInternalServerError)
|
|
|
|
|
+ return
|
|
|
|
|
+ }
|
|
|
|
|
+ defer backendConn.Close()
|
|
|
|
|
+
|
|
|
|
|
+ var errConnection error
|
|
|
|
|
+ frontendStdinToBackendComplete := make(chan struct{})
|
|
|
|
|
+ frontendResizeToBackendComplete := make(chan struct{})
|
|
|
|
|
+ backendToFrontendComplete := make(chan struct{})
|
|
|
|
|
+
|
|
|
|
|
+ go func() {
|
|
|
|
|
+ for {
|
|
|
|
|
+ _, msg, err := backendConn.ReadMessage()
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ e, ok := err.(*websocket.CloseError)
|
|
|
|
|
+ if !ok || e.Code != websocket.CloseNormalClosure {
|
|
|
|
|
+ errConnection = err
|
|
|
|
|
+ }
|
|
|
|
|
+ break
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ if len(msg) < 1 {
|
|
|
|
|
+ errConnection = fmt.Errorf("received err msg from backEnd (the length less than 1), msg: %s", string(msg))
|
|
|
|
|
+ break
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ switch msg[0] {
|
|
|
|
|
+ case stdoutChannel:
|
|
|
|
|
+ _, err = ctx.stdoutStream.Write(msg[1:])
|
|
|
|
|
+ case stderrChannel:
|
|
|
|
|
+ _, err = ctx.stderrStream.Write(msg[1:])
|
|
|
|
|
+ case errorChannel:
|
|
|
|
|
+ err = ctx.writeStatus(apierrors.NewInternalError(errors.New(string(msg[1:]))))
|
|
|
|
|
+ default:
|
|
|
|
|
+ err = fmt.Errorf("received invalid msg from backEnd, msg: %s", string(msg))
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ errConnection = err
|
|
|
|
|
+ break
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ close(backendToFrontendComplete)
|
|
|
|
|
+ }()
|
|
|
|
|
+
|
|
|
|
|
+ if opts.Stdin {
|
|
|
|
|
+ go func() {
|
|
|
|
|
+ r := &rwc{
|
|
|
|
|
+ c: backendConn,
|
|
|
|
|
+ index: stdinChannel,
|
|
|
|
|
+ }
|
|
|
|
|
+ _, err := io.Copy(r, ctx.stdinStream)
|
|
|
|
|
+ if err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
|
|
|
|
|
+ errConnection = fmt.Errorf("copy data from frontend(stdinStream) to backend failed, err: %v", err)
|
|
|
|
|
+ }
|
|
|
|
|
+ close(frontendStdinToBackendComplete)
|
|
|
|
|
+ }()
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ if opts.TTY {
|
|
|
|
|
+ go func() {
|
|
|
|
|
+ r := &rwc{
|
|
|
|
|
+ c: backendConn,
|
|
|
|
|
+ index: resizeChannel,
|
|
|
|
|
+ }
|
|
|
|
|
+ _, err := io.Copy(r, ctx.resizeStream)
|
|
|
|
|
+ if err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
|
|
|
|
|
+ errConnection = fmt.Errorf("copy data from frontend(resizeStream) to backend failed, err: %v", err)
|
|
|
|
|
+ }
|
|
|
|
|
+ close(frontendResizeToBackendComplete)
|
|
|
|
|
+ }()
|
|
|
|
|
+ }
|
|
|
|
|
+ select {
|
|
|
|
|
+ case <-backendToFrontendComplete:
|
|
|
|
|
+ case <-frontendStdinToBackendComplete:
|
|
|
|
|
+ case <-frontendResizeToBackendComplete:
|
|
|
|
|
+ }
|
|
|
|
|
+
|
2023-07-21 10:41:06 +08:00
|
|
|
+ select {
|
|
|
|
|
+ case <-backendToFrontendComplete:
|
|
|
|
|
+ case <-time.Tick(30 * time.Second):
|
|
|
|
|
+ klog.Errorf("Wait backend to frontend complete timeout")
|
|
|
|
|
+ }
|
|
|
|
|
+
|
2021-02-03 15:04:00 +08:00
|
|
|
+ if errConnection != nil {
|
|
|
|
|
+ klog.Errorf("SpdyProxy: the connection disconnected: %v", errConnection)
|
|
|
|
|
+ if exitErr, ok := errConnection.(exec.ExitError); ok && exitErr.Exited() {
|
|
|
|
|
+ rc := exitErr.ExitStatus()
|
|
|
|
|
+ ctx.writeStatus(&apierrors.StatusError{ErrStatus: v1.Status{
|
|
|
|
|
+ Status: v1.StatusFailure,
|
|
|
|
|
+ Reason: remotecommandconsts.NonZeroExitCodeReason,
|
|
|
|
|
+ Details: &v1.StatusDetails{
|
|
|
|
|
+ Causes: []v1.StatusCause{
|
|
|
|
|
+ {
|
|
|
|
|
+ Type: remotecommandconsts.ExitCodeCauseType,
|
|
|
|
|
+ Message: fmt.Sprintf("%d", rc),
|
|
|
|
|
+ },
|
|
|
|
|
+ },
|
|
|
|
|
+ },
|
|
|
|
|
+ Message: fmt.Sprintf("command terminated with non-zero exit code: %v", exitErr),
|
|
|
|
|
+ }})
|
|
|
|
|
+ } else if closeErr, ok := errConnection.(*websocket.CloseError); !ok || closeErr.Text != io.ErrUnexpectedEOF.Error() {
|
|
|
|
|
+ //ignore this ErrUnexpectedEOF because isulad always close the connection while reading a frame
|
|
|
|
|
+ err = fmt.Errorf("error executing command in container: %v", errConnection)
|
|
|
|
|
+ runtime.HandleError(err)
|
|
|
|
|
+ ctx.writeStatus(apierrors.NewInternalError(err))
|
|
|
|
|
+ }
|
|
|
|
|
+ } else {
|
|
|
|
|
+ ctx.writeStatus(&apierrors.StatusError{ErrStatus: v1.Status{
|
|
|
|
|
+ Status: v1.StatusSuccess,
|
|
|
|
|
+ }})
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func connectBackend(addr, subprotocol string, r *http.Request) (*websocket.Conn, error) {
|
|
|
|
|
+ h := http.Header{}
|
|
|
|
|
+ originHeadValue := r.Header.Get("Origin")
|
|
|
|
|
+ if originHeadValue != "" {
|
|
|
|
|
+ h["Origin"] = []string{originHeadValue}
|
|
|
|
|
+ }
|
|
|
|
|
+ websocket.DefaultDialer.Subprotocols = []string{subprotocol}
|
|
|
|
|
+ websocket.DefaultDialer.ReadBufferSize = 128 * 1024
|
|
|
|
|
+ websocket.DefaultDialer.WriteBufferSize = 128 * 1024
|
|
|
|
|
+ ws, resp, err := websocket.DefaultDialer.Dial(addr, h)
|
2023-01-03 15:05:23 +08:00
|
|
|
+ if err == nil {
|
|
|
|
|
+ return ws, nil
|
|
|
|
|
+ }
|
|
|
|
|
+ msg := fmt.Errorf("dial failed: %v, response Body is nil", err)
|
|
|
|
|
+ if resp != nil && resp.Body != nil {
|
|
|
|
|
+ defer func() {
|
|
|
|
|
+ //websocket buffer size maybe not enough and cause panic
|
|
|
|
|
+ if e := recover(); e != nil {
|
|
|
|
|
+ msg = fmt.Errorf("dial failed: %v, response panic %v", err, e)
|
|
|
|
|
+ }
|
|
|
|
|
+ resp.Body.Close()
|
|
|
|
|
+ }()
|
2021-02-03 15:04:00 +08:00
|
|
|
+ var body bytes.Buffer
|
|
|
|
|
+ body.ReadFrom(resp.Body)
|
2023-01-03 15:05:23 +08:00
|
|
|
+ msg = fmt.Errorf("dial failed: %v, response is: %v", err, body.String())
|
2021-02-03 15:04:00 +08:00
|
|
|
+ }
|
2023-01-03 15:05:23 +08:00
|
|
|
+ return nil, msg
|
2021-02-03 15:04:00 +08:00
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+type rwc struct {
|
|
|
|
|
+ c *websocket.Conn
|
|
|
|
|
+ index byte
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (c *rwc) Write(p []byte) (int, error) {
|
|
|
|
|
+ frame := make([]byte, len(p)+1)
|
|
|
|
|
+ frame[0] = byte(c.index)
|
|
|
|
|
+ copy(frame[1:], p)
|
|
|
|
|
+
|
|
|
|
|
+ err := c.c.WriteMessage(websocket.BinaryMessage, frame)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return 0, err
|
|
|
|
|
+ }
|
|
|
|
|
+ return len(p), nil
|
|
|
|
|
+}
|
|
|
|
|
--
|
2024-04-26 16:27:21 +08:00
|
|
|
2.34.1
|
2021-02-03 15:04:00 +08:00
|
|
|
|