kubernetes/0004-Add-an-option-for-aggregator.patch
zhangxiaoyu 742f1b97fe update to 1.24.0
Signed-off-by: zhangxiaoyu <zhangxiaoyu58@huawei.com>
2023-01-03 15:48:02 +08:00

285 lines
12 KiB
Diff

From 978dcb825f2808bd298bfbc6e9c91b1fd7859ba3 Mon Sep 17 00:00:00 2001
From: Di Jin <dxmhu@google.com>
Date: Thu, 1 Sep 2022 15:25:26 -0700
Subject: Add an option for aggregator
---
cmd/kube-apiserver/app/aggregator.go | 9 ++-
cmd/kube-apiserver/app/options/options.go | 9 ++-
.../app/options/options_test.go | 5 +-
.../pkg/util/proxy/upgradeaware.go | 27 +++++++
.../pkg/util/proxy/upgradeaware_test.go | 77 +++++++++++++++++++
.../pkg/apiserver/apiserver.go | 7 ++
.../pkg/apiserver/handler_proxy.go | 6 ++
7 files changed, 132 insertions(+), 8 deletions(-)
diff --git a/cmd/kube-apiserver/app/aggregator.go b/cmd/kube-apiserver/app/aggregator.go
index 2466dcc2..9cf23362 100644
--- a/cmd/kube-apiserver/app/aggregator.go
+++ b/cmd/kube-apiserver/app/aggregator.go
@@ -111,10 +111,11 @@ func createAggregatorConfig(
SharedInformerFactory: externalInformers,
},
ExtraConfig: aggregatorapiserver.ExtraConfig{
- ProxyClientCertFile: commandOptions.ProxyClientCertFile,
- ProxyClientKeyFile: commandOptions.ProxyClientKeyFile,
- ServiceResolver: serviceResolver,
- ProxyTransport: proxyTransport,
+ ProxyClientCertFile: commandOptions.ProxyClientCertFile,
+ ProxyClientKeyFile: commandOptions.ProxyClientKeyFile,
+ ServiceResolver: serviceResolver,
+ ProxyTransport: proxyTransport,
+ RejectForwardingRedirects: commandOptions.AggregatorRejectForwardingRedirects,
},
}
diff --git a/cmd/kube-apiserver/app/options/options.go b/cmd/kube-apiserver/app/options/options.go
index 9672b187..794d0d9f 100644
--- a/cmd/kube-apiserver/app/options/options.go
+++ b/cmd/kube-apiserver/app/options/options.go
@@ -75,7 +75,8 @@ type ServerRunOptions struct {
ProxyClientCertFile string
ProxyClientKeyFile string
- EnableAggregatorRouting bool
+ EnableAggregatorRouting bool
+ AggregatorRejectForwardingRedirects bool
MasterCount int
EndpointReconcilerType string
@@ -131,7 +132,8 @@ func NewServerRunOptions() *ServerRunOptions {
},
HTTPTimeout: time.Duration(5) * time.Second,
},
- ServiceNodePortRange: kubeoptions.DefaultServiceNodePortRange,
+ ServiceNodePortRange: kubeoptions.DefaultServiceNodePortRange,
+ AggregatorRejectForwardingRedirects: true,
}
// Overwrite the default for storage data format.
@@ -243,6 +245,9 @@ func (s *ServerRunOptions) Flags() (fss cliflag.NamedFlagSets) {
fs.BoolVar(&s.EnableAggregatorRouting, "enable-aggregator-routing", s.EnableAggregatorRouting,
"Turns on aggregator routing requests to endpoints IP rather than cluster IP.")
+ fs.BoolVar(&s.AggregatorRejectForwardingRedirects, "aggregator-reject-forwarding-redirect", s.AggregatorRejectForwardingRedirects,
+ "Aggregator reject forwarding redirect response back to client.")
+
fs.StringVar(&s.ServiceAccountSigningKeyFile, "service-account-signing-key-file", s.ServiceAccountSigningKeyFile, ""+
"Path to the file that contains the current private key of the service account token issuer. The issuer will sign issued ID tokens with this private key.")
diff --git a/cmd/kube-apiserver/app/options/options_test.go b/cmd/kube-apiserver/app/options/options_test.go
index 26eb6a95..90e1f045 100644
--- a/cmd/kube-apiserver/app/options/options_test.go
+++ b/cmd/kube-apiserver/app/options/options_test.go
@@ -315,8 +315,9 @@ func TestAddFlags(t *testing.T) {
Traces: &apiserveroptions.TracingOptions{
ConfigFile: "/var/run/kubernetes/tracing_config.yaml",
},
- IdentityLeaseDurationSeconds: 3600,
- IdentityLeaseRenewIntervalSeconds: 10,
+ IdentityLeaseDurationSeconds: 3600,
+ IdentityLeaseRenewIntervalSeconds: 10,
+ AggregatorRejectForwardingRedirects: true,
}
if !reflect.DeepEqual(expected, s) {
diff --git a/staging/src/k8s.io/apimachinery/pkg/util/proxy/upgradeaware.go b/staging/src/k8s.io/apimachinery/pkg/util/proxy/upgradeaware.go
index f56c17ca..a3a14241 100644
--- a/staging/src/k8s.io/apimachinery/pkg/util/proxy/upgradeaware.go
+++ b/staging/src/k8s.io/apimachinery/pkg/util/proxy/upgradeaware.go
@@ -83,6 +83,8 @@ type UpgradeAwareHandler struct {
MaxBytesPerSec int64
// Responder is passed errors that occur while setting up proxying.
Responder ErrorResponder
+ // Reject to forward redirect response
+ RejectForwardingRedirects bool
}
const defaultFlushInterval = 200 * time.Millisecond
@@ -257,6 +259,31 @@ func (h *UpgradeAwareHandler) ServeHTTP(w http.ResponseWriter, req *http.Request
proxy.Transport = h.Transport
proxy.FlushInterval = h.FlushInterval
proxy.ErrorLog = log.New(noSuppressPanicError{}, "", log.LstdFlags)
+ if h.RejectForwardingRedirects {
+ oldModifyResponse := proxy.ModifyResponse
+ proxy.ModifyResponse = func(response *http.Response) error {
+ code := response.StatusCode
+ if code >= 300 && code <= 399 {
+ // close the original response
+ response.Body.Close()
+ msg := "the backend attempted to redirect this request, which is not permitted"
+ // replace the response
+ *response = http.Response{
+ StatusCode: http.StatusBadGateway,
+ Status: fmt.Sprintf("%d %s", response.StatusCode, http.StatusText(response.StatusCode)),
+ Body: io.NopCloser(strings.NewReader(msg)),
+ ContentLength: int64(len(msg)),
+ }
+ } else {
+ if oldModifyResponse != nil {
+ if err := oldModifyResponse(response); err != nil {
+ return err
+ }
+ }
+ }
+ return nil
+ }
+ }
if h.Responder != nil {
// if an optional error interceptor/responder was provided wire it
// the custom responder might be used for providing a unified error reporting
diff --git a/staging/src/k8s.io/apimachinery/pkg/util/proxy/upgradeaware_test.go b/staging/src/k8s.io/apimachinery/pkg/util/proxy/upgradeaware_test.go
index f57b69a0..0d77fb11 100644
--- a/staging/src/k8s.io/apimachinery/pkg/util/proxy/upgradeaware_test.go
+++ b/staging/src/k8s.io/apimachinery/pkg/util/proxy/upgradeaware_test.go
@@ -704,6 +704,83 @@ func TestProxyUpgradeErrorResponse(t *testing.T) {
}
}
+func TestRejectForwardingRedirectsOption(t *testing.T) {
+ originalBody := []byte(`some data`)
+ testCases := []struct {
+ name string
+ rejectForwardingRedirects bool
+ serverStatusCode int
+ expectStatusCode int
+ expectBody []byte
+ }{
+ {
+ name: "reject redirection enabled in proxy, backend server sending 200 response",
+ rejectForwardingRedirects: true,
+ serverStatusCode: 200,
+ expectStatusCode: 200,
+ expectBody: originalBody,
+ },
+ {
+ name: "reject redirection enabled in proxy, backend server sending 301 response",
+ rejectForwardingRedirects: true,
+ serverStatusCode: 301,
+ expectStatusCode: 502,
+ expectBody: []byte(`the backend attempted to redirect this request, which is not permitted`),
+ },
+ {
+ name: "reject redirection disabled in proxy, backend server sending 200 response",
+ rejectForwardingRedirects: false,
+ serverStatusCode: 200,
+ expectStatusCode: 200,
+ expectBody: originalBody,
+ },
+ {
+ name: "reject redirection disabled in proxy, backend server sending 301 response",
+ rejectForwardingRedirects: false,
+ serverStatusCode: 301,
+ expectStatusCode: 301,
+ expectBody: originalBody,
+ },
+ }
+ for _, tc := range testCases {
+ t.Run(tc.name, func(t *testing.T) {
+ // Set up a backend server
+ backendServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ w.WriteHeader(tc.serverStatusCode)
+ w.Write(originalBody)
+ }))
+ defer backendServer.Close()
+ backendServerURL, _ := url.Parse(backendServer.URL)
+
+ // Set up a proxy pointing to the backend
+ proxyHandler := NewUpgradeAwareHandler(backendServerURL, nil, false, false, &fakeResponder{t: t})
+ proxyHandler.RejectForwardingRedirects = tc.rejectForwardingRedirects
+ proxy := httptest.NewServer(proxyHandler)
+ defer proxy.Close()
+ proxyURL, _ := url.Parse(proxy.URL)
+
+ conn, err := net.Dial("tcp", proxyURL.Host)
+ require.NoError(t, err)
+ bufferedReader := bufio.NewReader(conn)
+
+ req, _ := http.NewRequest("GET", proxyURL.String(), nil)
+ require.NoError(t, req.Write(conn))
+ // Verify we get the correct response and message body content
+ resp, err := http.ReadResponse(bufferedReader, nil)
+ require.NoError(t, err)
+ assert.Equal(t, tc.expectStatusCode, resp.StatusCode)
+ data, err := ioutil.ReadAll(resp.Body)
+ require.NoError(t, err)
+ assert.Equal(t, tc.expectBody, data)
+ assert.Equal(t, int64(len(tc.expectBody)), resp.ContentLength)
+ resp.Body.Close()
+
+ // clean up
+ conn.Close()
+ })
+ }
+}
+
func TestDefaultProxyTransport(t *testing.T) {
tests := []struct {
name,
diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go
index e945fb48..133887e1 100644
--- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go
+++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go
@@ -87,6 +87,8 @@ type ExtraConfig struct {
// Mechanism by which the Aggregator will resolve services. Required.
ServiceResolver ServiceResolver
+
+ RejectForwardingRedirects bool
}
// Config represents the configuration needed to create an APIAggregator.
@@ -156,6 +158,9 @@ type APIAggregator struct {
// egressSelector selects the proper egress dialer to communicate with the custom apiserver
// overwrites proxyTransport dialer if not nil
egressSelector *egressselector.EgressSelector
+
+ // rejectForwardingRedirects is whether to allow to forward redirect response
+ rejectForwardingRedirects bool
}
// Complete fills in any fields not set that are required to have valid data. It's mutating the receiver.
@@ -213,6 +218,7 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
openAPIV3Config: c.GenericConfig.OpenAPIV3Config,
egressSelector: c.GenericConfig.EgressSelector,
proxyCurrentCertKeyContent: func() (bytes []byte, bytes2 []byte) { return nil, nil },
+ rejectForwardingRedirects: c.ExtraConfig.RejectForwardingRedirects,
}
// used later to filter the served resource by those that have expired.
@@ -443,6 +449,7 @@ func (s *APIAggregator) AddAPIService(apiService *v1.APIService) error {
proxyTransport: s.proxyTransport,
serviceResolver: s.serviceResolver,
egressSelector: s.egressSelector,
+ rejectForwardingRedirects: s.rejectForwardingRedirects,
}
proxyHandler.updateAPIService(apiService)
if s.openAPIAggregationController != nil {
diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go
index 3a880b6b..e1282f2a 100644
--- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go
+++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go
@@ -68,6 +68,9 @@ type proxyHandler struct {
// egressSelector selects the proper egress dialer to communicate with the custom apiserver
// overwrites proxyTransport dialer if not nil
egressSelector *egressselector.EgressSelector
+
+ // reject to forward redirect response
+ rejectForwardingRedirects bool
}
type proxyHandlingInfo struct {
@@ -172,6 +175,9 @@ func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
}
handler := proxy.NewUpgradeAwareHandler(location, proxyRoundTripper, true, upgrade, &responder{w: w})
+ if r.rejectForwardingRedirects {
+ handler.RejectForwardingRedirects = true
+ }
utilflowcontrol.RequestDelegated(req.Context())
handler.ServeHTTP(w, newReq)
}
--
2.25.1