!7 kubelet support exec websocket protocol
From: @gaohuatao Reviewed-by: @caihaomin Signed-off-by: @caihaomin
This commit is contained in:
commit
818e4b58d1
266
0001-kubelet-support-exec-websocket-protocol.patch
Normal file
266
0001-kubelet-support-exec-websocket-protocol.patch
Normal file
@ -0,0 +1,266 @@
|
||||
From ac8fda0c77cb588f59aff7c86c05933a7a2d77c4 Mon Sep 17 00:00:00 2001
|
||||
From: gaohuatao <gaohuatao@huawei.com>
|
||||
Date: Wed, 3 Feb 2021 14:59:37 +0800
|
||||
Subject: [PATCH] kubelet support exec websocket protocol
|
||||
|
||||
Signed-off-by: gaohuatao <gaohuatao@huawei.com>
|
||||
---
|
||||
.../cri/streaming/remotecommand/proxy.go | 197 ++++++++++++++++++
|
||||
pkg/kubelet/server/server.go | 21 +-
|
||||
2 files changed, 214 insertions(+), 4 deletions(-)
|
||||
create mode 100644 pkg/kubelet/cri/streaming/remotecommand/proxy.go
|
||||
|
||||
diff --git a/pkg/kubelet/cri/streaming/remotecommand/proxy.go b/pkg/kubelet/cri/streaming/remotecommand/proxy.go
|
||||
new file mode 100644
|
||||
index 00000000..5b99747c
|
||||
--- /dev/null
|
||||
+++ b/pkg/kubelet/cri/streaming/remotecommand/proxy.go
|
||||
@@ -0,0 +1,197 @@
|
||||
+package remotecommand
|
||||
+
|
||||
+import (
|
||||
+ "bytes"
|
||||
+ "errors"
|
||||
+ "fmt"
|
||||
+ "io"
|
||||
+ "net/http"
|
||||
+ "net/url"
|
||||
+ "strings"
|
||||
+ "time"
|
||||
+
|
||||
+ "github.com/gorilla/websocket"
|
||||
+ apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
+ "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
+ remotecommandconsts "k8s.io/apimachinery/pkg/util/remotecommand"
|
||||
+ "k8s.io/apimachinery/pkg/util/runtime"
|
||||
+ "k8s.io/klog/v2"
|
||||
+ "k8s.io/kubernetes/staging/src/k8s.io/client-go/util/exec"
|
||||
+)
|
||||
+
|
||||
+var (
|
||||
+ streamIdleTimeout = 4 * time.Hour
|
||||
+ streamCreationTimeout = remotecommandconsts.DefaultStreamCreationTimeout
|
||||
+)
|
||||
+
|
||||
+// proxyStreamToWebSocket proxies stream to url with websocket.
|
||||
+func ProxyToWebSocket(w http.ResponseWriter, r *http.Request, url *url.URL, opts *Options) {
|
||||
+ klog.V(8).Infof("start proxy request to websocket %+v", r)
|
||||
+ ctx, ok := createStreams(
|
||||
+ r,
|
||||
+ w,
|
||||
+ opts,
|
||||
+ remotecommandconsts.SupportedStreamingProtocols,
|
||||
+ streamIdleTimeout,
|
||||
+ streamCreationTimeout)
|
||||
+ if !ok {
|
||||
+ msg := "failed to create stream to fontend"
|
||||
+ klog.Error(msg)
|
||||
+ http.Error(w, msg, http.StatusInternalServerError)
|
||||
+ return
|
||||
+ }
|
||||
+ defer func() {
|
||||
+ if err := ctx.conn.Close(); err != nil {
|
||||
+ klog.Errorf("failed to close connection, %v", err)
|
||||
+ }
|
||||
+ }()
|
||||
+
|
||||
+ klog.V(8).Infof("start connecting to websocket %s", url.String())
|
||||
+ backendConn, err := connectBackend(url.String(), "channel.k8s.io", r)
|
||||
+ if err != nil {
|
||||
+ msg := fmt.Sprintf("connectBackend failed: %v", err)
|
||||
+ klog.Error(msg)
|
||||
+ http.Error(w, msg, http.StatusInternalServerError)
|
||||
+ return
|
||||
+ }
|
||||
+ defer backendConn.Close()
|
||||
+
|
||||
+ var errConnection error
|
||||
+ frontendStdinToBackendComplete := make(chan struct{})
|
||||
+ frontendResizeToBackendComplete := make(chan struct{})
|
||||
+ backendToFrontendComplete := make(chan struct{})
|
||||
+
|
||||
+ go func() {
|
||||
+ for {
|
||||
+ _, msg, err := backendConn.ReadMessage()
|
||||
+ if err != nil {
|
||||
+ e, ok := err.(*websocket.CloseError)
|
||||
+ if !ok || e.Code != websocket.CloseNormalClosure {
|
||||
+ errConnection = err
|
||||
+ }
|
||||
+ break
|
||||
+ }
|
||||
+
|
||||
+ if len(msg) < 1 {
|
||||
+ errConnection = fmt.Errorf("received err msg from backEnd (the length less than 1), msg: %s", string(msg))
|
||||
+ break
|
||||
+ }
|
||||
+
|
||||
+ switch msg[0] {
|
||||
+ case stdoutChannel:
|
||||
+ _, err = ctx.stdoutStream.Write(msg[1:])
|
||||
+ case stderrChannel:
|
||||
+ _, err = ctx.stderrStream.Write(msg[1:])
|
||||
+ case errorChannel:
|
||||
+ err = ctx.writeStatus(apierrors.NewInternalError(errors.New(string(msg[1:]))))
|
||||
+ default:
|
||||
+ err = fmt.Errorf("received invalid msg from backEnd, msg: %s", string(msg))
|
||||
+ }
|
||||
+
|
||||
+ if err != nil {
|
||||
+ errConnection = err
|
||||
+ break
|
||||
+ }
|
||||
+ }
|
||||
+ close(backendToFrontendComplete)
|
||||
+ }()
|
||||
+
|
||||
+ if opts.Stdin {
|
||||
+ go func() {
|
||||
+ r := &rwc{
|
||||
+ c: backendConn,
|
||||
+ index: stdinChannel,
|
||||
+ }
|
||||
+ _, err := io.Copy(r, ctx.stdinStream)
|
||||
+ if err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
|
||||
+ errConnection = fmt.Errorf("copy data from frontend(stdinStream) to backend failed, err: %v", err)
|
||||
+ }
|
||||
+ close(frontendStdinToBackendComplete)
|
||||
+ }()
|
||||
+ }
|
||||
+
|
||||
+ if opts.TTY {
|
||||
+ go func() {
|
||||
+ r := &rwc{
|
||||
+ c: backendConn,
|
||||
+ index: resizeChannel,
|
||||
+ }
|
||||
+ _, err := io.Copy(r, ctx.resizeStream)
|
||||
+ if err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
|
||||
+ errConnection = fmt.Errorf("copy data from frontend(resizeStream) to backend failed, err: %v", err)
|
||||
+ }
|
||||
+ close(frontendResizeToBackendComplete)
|
||||
+ }()
|
||||
+ }
|
||||
+ select {
|
||||
+ case <-backendToFrontendComplete:
|
||||
+ case <-frontendStdinToBackendComplete:
|
||||
+ case <-frontendResizeToBackendComplete:
|
||||
+ }
|
||||
+
|
||||
+ if errConnection != nil {
|
||||
+ klog.Errorf("SpdyProxy: the connection disconnected: %v", errConnection)
|
||||
+ if exitErr, ok := errConnection.(exec.ExitError); ok && exitErr.Exited() {
|
||||
+ rc := exitErr.ExitStatus()
|
||||
+ ctx.writeStatus(&apierrors.StatusError{ErrStatus: v1.Status{
|
||||
+ Status: v1.StatusFailure,
|
||||
+ Reason: remotecommandconsts.NonZeroExitCodeReason,
|
||||
+ Details: &v1.StatusDetails{
|
||||
+ Causes: []v1.StatusCause{
|
||||
+ {
|
||||
+ Type: remotecommandconsts.ExitCodeCauseType,
|
||||
+ Message: fmt.Sprintf("%d", rc),
|
||||
+ },
|
||||
+ },
|
||||
+ },
|
||||
+ Message: fmt.Sprintf("command terminated with non-zero exit code: %v", exitErr),
|
||||
+ }})
|
||||
+ } else if closeErr, ok := errConnection.(*websocket.CloseError); !ok || closeErr.Text != io.ErrUnexpectedEOF.Error() {
|
||||
+ //ignore this ErrUnexpectedEOF because isulad always close the connection while reading a frame
|
||||
+ err = fmt.Errorf("error executing command in container: %v", errConnection)
|
||||
+ runtime.HandleError(err)
|
||||
+ ctx.writeStatus(apierrors.NewInternalError(err))
|
||||
+ }
|
||||
+ } else {
|
||||
+ ctx.writeStatus(&apierrors.StatusError{ErrStatus: v1.Status{
|
||||
+ Status: v1.StatusSuccess,
|
||||
+ }})
|
||||
+ }
|
||||
+}
|
||||
+
|
||||
+func connectBackend(addr, subprotocol string, r *http.Request) (*websocket.Conn, error) {
|
||||
+ h := http.Header{}
|
||||
+ originHeadValue := r.Header.Get("Origin")
|
||||
+ if originHeadValue != "" {
|
||||
+ h["Origin"] = []string{originHeadValue}
|
||||
+ }
|
||||
+ websocket.DefaultDialer.Subprotocols = []string{subprotocol}
|
||||
+ websocket.DefaultDialer.ReadBufferSize = 128 * 1024
|
||||
+ websocket.DefaultDialer.WriteBufferSize = 128 * 1024
|
||||
+ ws, resp, err := websocket.DefaultDialer.Dial(addr, h)
|
||||
+ if err != nil {
|
||||
+ var body bytes.Buffer
|
||||
+ body.ReadFrom(resp.Body)
|
||||
+ defer resp.Body.Close()
|
||||
+ msg := fmt.Errorf("dial failed: %v, response is: %v", err, body.String())
|
||||
+ return nil, msg
|
||||
+ }
|
||||
+ return ws, nil
|
||||
+}
|
||||
+
|
||||
+type rwc struct {
|
||||
+ c *websocket.Conn
|
||||
+ index byte
|
||||
+}
|
||||
+
|
||||
+func (c *rwc) Write(p []byte) (int, error) {
|
||||
+ frame := make([]byte, len(p)+1)
|
||||
+ frame[0] = byte(c.index)
|
||||
+ copy(frame[1:], p)
|
||||
+
|
||||
+ err := c.c.WriteMessage(websocket.BinaryMessage, frame)
|
||||
+ if err != nil {
|
||||
+ return 0, err
|
||||
+ }
|
||||
+ return len(p), nil
|
||||
+}
|
||||
diff --git a/pkg/kubelet/server/server.go b/pkg/kubelet/server/server.go
|
||||
index 2b033e2c..1d19fed6 100644
|
||||
--- a/pkg/kubelet/server/server.go
|
||||
+++ b/pkg/kubelet/server/server.go
|
||||
@@ -782,26 +782,39 @@ func (s *Server) getAttach(request *restful.Request, response *restful.Response)
|
||||
|
||||
// getExec handles requests to run a command inside a container.
|
||||
func (s *Server) getExec(request *restful.Request, response *restful.Response) {
|
||||
- params := getExecRequestParams(request)
|
||||
streamOpts, err := remotecommandserver.NewOptions(request.Request)
|
||||
if err != nil {
|
||||
utilruntime.HandleError(err)
|
||||
response.WriteError(http.StatusBadRequest, err)
|
||||
return
|
||||
}
|
||||
+ url, err := s.getExecUrl(request, response, streamOpts)
|
||||
+ if err != nil {
|
||||
+ klog.Errorf("failed to get backend url %v", err)
|
||||
+ return
|
||||
+ }
|
||||
+ if url.Scheme == "ws" || url.Scheme == "wss" {
|
||||
+ remotecommandserver.ProxyToWebSocket(response.ResponseWriter, request.Request, url, streamOpts)
|
||||
+ } else {
|
||||
+ proxyStream(response.ResponseWriter, request.Request, url)
|
||||
+ }
|
||||
+}
|
||||
+
|
||||
+func (s *Server) getExecUrl(request *restful.Request, response *restful.Response, streamOpts *remotecommandserver.Options) (*url.URL, error) {
|
||||
+ params := getExecRequestParams(request)
|
||||
pod, ok := s.host.GetPodByName(params.podNamespace, params.podName)
|
||||
if !ok {
|
||||
response.WriteError(http.StatusNotFound, fmt.Errorf("pod does not exist"))
|
||||
- return
|
||||
+ return nil, fmt.Errorf("pod not found")
|
||||
}
|
||||
|
||||
podFullName := kubecontainer.GetPodFullName(pod)
|
||||
url, err := s.host.GetExec(podFullName, params.podUID, params.containerName, params.cmd, *streamOpts)
|
||||
if err != nil {
|
||||
streaming.WriteError(err, response.ResponseWriter)
|
||||
- return
|
||||
+ return nil, err
|
||||
}
|
||||
- proxyStream(response.ResponseWriter, request.Request, url)
|
||||
+ return url, nil
|
||||
}
|
||||
|
||||
// getRun handles requests to run a command inside a container.
|
||||
--
|
||||
2.20.1
|
||||
|
||||
@ -3,7 +3,7 @@
|
||||
|
||||
Name: kubernetes
|
||||
Version: 1.20.2
|
||||
Release: 1
|
||||
Release: 2
|
||||
Summary: Container cluster management
|
||||
License: ASL 2.0
|
||||
URL: https://k8s.io/kubernetes
|
||||
@ -24,6 +24,8 @@ Source13: kubernetes-accounting.conf
|
||||
Source14: kubeadm.conf
|
||||
Source15: kubernetes.conf
|
||||
|
||||
Patch6000: 0001-kubelet-support-exec-websocket-protocol.patch
|
||||
|
||||
%description
|
||||
Container cluster management.
|
||||
|
||||
@ -83,7 +85,7 @@ Summary: Help documents for kubernetes
|
||||
Help documents for kubernetes.
|
||||
|
||||
%prep
|
||||
%setup -q -n kubernetes-1.20.2
|
||||
%autosetup -n kubernetes-1.20.2 -Sgit -p1
|
||||
mkdir -p src/k8s.io/kubernetes
|
||||
mv $(ls | grep -v "^src$") src/k8s.io/kubernetes/.
|
||||
|
||||
@ -252,6 +254,9 @@ getent passwd kube >/dev/null || useradd -r -g kube -d / -s /sbin/nologin \
|
||||
%systemd_postun kubelet kube-proxy
|
||||
|
||||
%changelog
|
||||
* Thu Feb 2 2021 gaohuatao <gaohuatao@huawei.com> - 1.20.2-2
|
||||
- Add kubelet support ws
|
||||
|
||||
* Fri Jan 20 2021 lixiang <lixiang172@huawei.com> - 1.20.2-1
|
||||
- Bump version to v1.20.2
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user