KubeOS/0015-proxy-Add-unit-tests-and-delete-useless-dependencies.patch
Yuhang Wei f2256bb8f5 sync branch to openEuler-22.03-LTS-SP1
Signed-off-by: Yuhang Wei <weiyuhang3@huawei.com>
(cherry picked from commit 9186039f774168dfbacef04dac8ee56356149736)
2024-03-13 17:11:26 +08:00

1012 lines
42 KiB
Diff

From 2e05a5ce397b1ad3b889e200ee278f9efb5276f8 Mon Sep 17 00:00:00 2001
From: liyuanr <liyuanrong1@huawei.com>
Date: Tue, 30 Jan 2024 15:04:13 +0800
Subject: [PATCH 2/4] proxy: Add unit tests and delete useless dependencies
from Cargo.toml
Add the proxy unit test ,modify the AgentCallClient ,
AgentClient and ProxyController struct design to make
it easier to mock the agent invoking.
Delete useless dependencies from Cargo.toml and Cargo.lock
Signed-off-by: liyuanr <liyuanrong1@huawei.com>
---
KubeOS-Rust/Cargo.lock | 14 --
KubeOS-Rust/proxy/Cargo.toml | 3 +-
.../proxy/src/controller/agentclient.rs | 74 ++++------
.../proxy/src/controller/apiserver_mock.rs | 137 +++++++++++++++---
.../proxy/src/controller/controller.rs | 121 ++++++++--------
KubeOS-Rust/proxy/src/controller/mod.rs | 2 +-
KubeOS-Rust/proxy/src/controller/utils.rs | 12 +-
KubeOS-Rust/proxy/src/drain.rs | 24 +--
KubeOS-Rust/proxy/src/main.rs | 9 +-
9 files changed, 232 insertions(+), 164 deletions(-)
diff --git a/KubeOS-Rust/Cargo.lock b/KubeOS-Rust/Cargo.lock
index c44c152..2342c7b 100644
--- a/KubeOS-Rust/Cargo.lock
+++ b/KubeOS-Rust/Cargo.lock
@@ -1180,18 +1180,6 @@ dependencies = [
"syn 1.0.109",
]
-[[package]]
-name = "mockall_double"
-version = "0.2.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "7dffc15b97456ecc84d2bde8c1df79145e154f45225828c4361f676e1b82acd6"
-dependencies = [
- "cfg-if",
- "proc-macro2",
- "quote",
- "syn 1.0.109",
-]
-
[[package]]
name = "mockito"
version = "0.31.1"
@@ -1503,7 +1491,6 @@ dependencies = [
"anyhow",
"assert-json-diff",
"async-trait",
- "chrono",
"cli",
"env_logger",
"futures",
@@ -1515,7 +1502,6 @@ dependencies = [
"log",
"manager",
"mockall",
- "mockall_double",
"regex",
"reqwest",
"schemars",
diff --git a/KubeOS-Rust/proxy/Cargo.toml b/KubeOS-Rust/proxy/Cargo.toml
index fe657af..94e3b3c 100644
--- a/KubeOS-Rust/proxy/Cargo.toml
+++ b/KubeOS-Rust/proxy/Cargo.toml
@@ -17,7 +17,6 @@ path = "src/main.rs"
[dependencies]
anyhow = "1.0.44"
async-trait = "0.1"
-chrono = { version = "0.4", default-features = false, features = ["std"] }
cli = { version = "1.0.5", path = "../cli" }
env_logger = "0.9.0"
futures = "0.3.17"
@@ -45,4 +44,4 @@ http = "0.2.9"
hyper = "0.14.25"
tower-test = "0.4.0"
mockall = { version = "=0.11.3" }
-mockall_double = "0.2.1"
+
diff --git a/KubeOS-Rust/proxy/src/controller/agentclient.rs b/KubeOS-Rust/proxy/src/controller/agentclient.rs
index 73489a9..b833f27 100644
--- a/KubeOS-Rust/proxy/src/controller/agentclient.rs
+++ b/KubeOS-Rust/proxy/src/controller/agentclient.rs
@@ -22,13 +22,6 @@ use cli::{
};
use manager::api::{CertsInfo, ConfigureRequest, KeyInfo as AgentKeyInfo, Sysconfig as AgentSysconfig, UpgradeRequest};
-#[cfg_attr(test, double)]
-use agent_call::AgentCallClient;
-#[cfg(test)]
-use mockall::automock;
-#[cfg(test)]
-use mockall_double::double;
-
pub struct UpgradeInfo {
pub version: String,
pub image_type: String,
@@ -57,45 +50,40 @@ pub struct KeyInfo {
pub operation: String,
}
-#[cfg_attr(test, automock)]
pub trait AgentMethod {
- fn prepare_upgrade_method(&self, upgrade_info: UpgradeInfo, agent_call: AgentCallClient) -> Result<(), Error>;
- fn upgrade_method(&self, agent_call: AgentCallClient) -> Result<(), Error>;
- fn rollback_method(&self, agent_call: AgentCallClient) -> Result<(), Error>;
- fn configure_method(&self, config_info: ConfigInfo, agent_call: AgentCallClient) -> Result<(), Error>;
+ fn prepare_upgrade_method(&self, upgrade_info: UpgradeInfo) -> Result<(), Error>;
+ fn upgrade_method(&self) -> Result<(), Error>;
+ fn rollback_method(&self) -> Result<(), Error>;
+ fn configure_method(&self, config_info: ConfigInfo) -> Result<(), Error>;
}
-
-pub mod agent_call {
- use super::{Client, Error, RpcMethod};
- #[cfg(test)]
- use mockall::automock;
-
- #[derive(Default)]
- pub struct AgentCallClient {}
-
- #[cfg_attr(test, automock)]
- impl AgentCallClient {
- pub fn call_agent<T: RpcMethod + 'static>(&self, client: &Client, method: T) -> Result<(), Error> {
- match method.call(client) {
- Ok(_resp) => Ok(()),
- Err(e) => Err(Error::AgentError { source: e }),
- }
- }
- }
+pub trait AgentCall {
+ fn call_agent<T: RpcMethod + 'static>(&self, client: &Client, method: T) -> Result<(), Error>;
}
-pub struct AgentClient {
+pub struct AgentClient<T: AgentCall> {
pub agent_client: Client,
+ pub agent_call_client: T,
+}
+
+impl<T: AgentCall> AgentClient<T> {
+ pub fn new<P: AsRef<Path>>(socket_path: P, agent_call_client: T) -> Self {
+ AgentClient { agent_client: Client::new(socket_path), agent_call_client }
+ }
}
-impl AgentClient {
- pub fn new<P: AsRef<Path>>(socket_path: P) -> Self {
- AgentClient { agent_client: Client::new(socket_path) }
+#[derive(Default)]
+pub struct AgentCallClient {}
+impl AgentCall for AgentCallClient {
+ fn call_agent<T: RpcMethod + 'static>(&self, client: &Client, method: T) -> Result<(), Error> {
+ match method.call(client) {
+ Ok(_resp) => Ok(()),
+ Err(e) => Err(Error::AgentError { source: e }),
+ }
}
}
-impl AgentMethod for AgentClient {
- fn prepare_upgrade_method(&self, upgrade_info: UpgradeInfo, agent_call: AgentCallClient) -> Result<(), Error> {
+impl<T: AgentCall> AgentMethod for AgentClient<T> {
+ fn prepare_upgrade_method(&self, upgrade_info: UpgradeInfo) -> Result<(), Error> {
let upgrade_request = UpgradeRequest {
version: upgrade_info.version,
image_type: upgrade_info.image_type,
@@ -110,27 +98,27 @@ impl AgentMethod for AgentClient {
client_key: upgrade_info.clientkey,
},
};
- match agent_call.call_agent(&self.agent_client, PrepareUpgradeMethod::new(upgrade_request)) {
+ match self.agent_call_client.call_agent(&self.agent_client, PrepareUpgradeMethod::new(upgrade_request)) {
Ok(_resp) => Ok(()),
Err(e) => Err(e),
}
}
- fn upgrade_method(&self, agent_call: AgentCallClient) -> Result<(), Error> {
- match agent_call.call_agent(&self.agent_client, UpgradeMethod::default()) {
+ fn upgrade_method(&self) -> Result<(), Error> {
+ match self.agent_call_client.call_agent(&self.agent_client, UpgradeMethod::default()) {
Ok(_resp) => Ok(()),
Err(e) => Err(e),
}
}
- fn rollback_method(&self, agent_call: AgentCallClient) -> Result<(), Error> {
- match agent_call.call_agent(&self.agent_client, RollbackMethod::default()) {
+ fn rollback_method(&self) -> Result<(), Error> {
+ match self.agent_call_client.call_agent(&self.agent_client, RollbackMethod::default()) {
Ok(_resp) => Ok(()),
Err(e) => Err(e),
}
}
- fn configure_method(&self, config_info: ConfigInfo, agent_call: AgentCallClient) -> Result<(), Error> {
+ fn configure_method(&self, config_info: ConfigInfo) -> Result<(), Error> {
let mut agent_configs: Vec<AgentSysconfig> = Vec::new();
for config in config_info.configs {
let mut contents_tmp: HashMap<String, AgentKeyInfo> = HashMap::new();
@@ -147,7 +135,7 @@ impl AgentMethod for AgentClient {
})
}
let config_request = ConfigureRequest { configs: agent_configs };
- match agent_call.call_agent(&self.agent_client, ConfigureMethod::new(config_request)) {
+ match self.agent_call_client.call_agent(&self.agent_client, ConfigureMethod::new(config_request)) {
Ok(_resp) => Ok(()),
Err(e) => Err(e),
}
diff --git a/KubeOS-Rust/proxy/src/controller/apiserver_mock.rs b/KubeOS-Rust/proxy/src/controller/apiserver_mock.rs
index c46d26a..ef5977c 100644
--- a/KubeOS-Rust/proxy/src/controller/apiserver_mock.rs
+++ b/KubeOS-Rust/proxy/src/controller/apiserver_mock.rs
@@ -2,15 +2,20 @@ use self::mock_error::Error;
use super::{
agentclient::*,
crd::{Configs, OSInstanceStatus},
- values::{NODE_STATUS_CONFIG, NODE_STATUS_UPGRADE},
+ values::{NODE_STATUS_CONFIG, NODE_STATUS_UPGRADE, OPERATION_TYPE_ROLLBACK},
};
use crate::controller::{
apiclient::{ApplyApi, ControllerClient},
- crd::{OSInstance, OSInstanceSpec, OSSpec, OS},
+ crd::{Config, Content, OSInstance, OSInstanceSpec, OSSpec, OS},
values::{LABEL_OSINSTANCE, LABEL_UPGRADING, NODE_STATUS_IDLE},
ProxyController,
};
use anyhow::Result;
+use cli::client::Client;
+use cli::method::{
+ callable_method::RpcMethod, configure::ConfigureMethod, prepare_upgrade::PrepareUpgradeMethod,
+ rollback::RollbackMethod, upgrade::UpgradeMethod,
+};
use http::{Request, Response};
use hyper::{body::to_bytes, Body};
use k8s_openapi::api::core::v1::Pod;
@@ -19,7 +24,8 @@ use kube::{
api::ObjectMeta,
core::{ListMeta, ObjectList},
};
-use kube::{Client, Resource, ResourceExt};
+use kube::{Client as KubeClient, Resource, ResourceExt};
+use mockall::mock;
use std::collections::BTreeMap;
type ApiServerHandle = tower_test::mock::Handle<Request<Body>, Response<Body>>;
@@ -34,6 +40,7 @@ pub enum Testcases {
ConfigNormal(OSInstance),
ConfigVersionMismatchReassign(OSInstance),
ConfigVersionMismatchUpdate(OSInstance),
+ Rollback(OSInstance),
}
pub async fn timeout_after_5s(handle: tokio::task::JoinHandle<()>) {
@@ -59,7 +66,7 @@ impl ApiServerVerifier {
.unwrap()
.handler_node_get(osi)
.await
- },
+ }
Testcases::UpgradeNormal(osi) => {
self.handler_osinstance_get_exist(osi.clone())
.await
@@ -78,7 +85,7 @@ impl ApiServerVerifier {
.unwrap()
.handler_node_pod_list_get(osi)
.await
- },
+ }
Testcases::UpgradeUpgradeconfigsVersionMismatch(osi) => {
self.handler_osinstance_get_exist(osi.clone())
.await
@@ -97,7 +104,7 @@ impl ApiServerVerifier {
.unwrap()
.handler_osinstance_patch_nodestatus_idle(osi)
.await
- },
+ }
Testcases::UpgradeOSInstaceNodestatusConfig(osi) => {
self.handler_osinstance_get_exist(osi.clone())
.await
@@ -107,7 +114,7 @@ impl ApiServerVerifier {
.unwrap()
.handler_node_get_with_label(osi.clone())
.await
- },
+ }
Testcases::UpgradeOSInstaceNodestatusIdle(osi) => {
self.handler_osinstance_get_exist(osi.clone())
.await
@@ -123,7 +130,7 @@ impl ApiServerVerifier {
.unwrap()
.handler_node_uncordon(osi)
.await
- },
+ }
Testcases::ConfigNormal(osi) => {
self.handler_osinstance_get_exist(osi.clone())
.await
@@ -139,7 +146,7 @@ impl ApiServerVerifier {
.unwrap()
.handler_osinstance_patch_nodestatus_idle(osi)
.await
- },
+ }
Testcases::ConfigVersionMismatchReassign(osi) => {
self.handler_osinstance_get_exist(osi.clone())
.await
@@ -152,7 +159,7 @@ impl ApiServerVerifier {
.unwrap()
.handler_osinstance_patch_nodestatus_idle(osi)
.await
- },
+ }
Testcases::ConfigVersionMismatchUpdate(osi) => {
self.handler_osinstance_get_exist(osi.clone())
.await
@@ -165,7 +172,26 @@ impl ApiServerVerifier {
.unwrap()
.handler_osinstance_patch_spec_sysconfig_v2(osi)
.await
- },
+ }
+ Testcases::Rollback(osi) => {
+ self.handler_osinstance_get_exist(osi.clone())
+ .await
+ .unwrap()
+ .handler_osinstance_get_exist(osi.clone())
+ .await
+ .unwrap()
+ .handler_node_get_with_label(osi.clone())
+ .await
+ .unwrap()
+ .handler_osinstance_patch_upgradeconfig_v2(osi.clone())
+ .await
+ .unwrap()
+ .handler_node_cordon(osi.clone())
+ .await
+ .unwrap()
+ .handler_node_pod_list_get(osi)
+ .await
+ }
}
.expect("Case completed without errors");
})
@@ -437,6 +463,7 @@ impl ApiServerVerifier {
pub mod mock_error {
use thiserror::Error;
+
#[derive(Error, Debug)]
pub enum Error {
#[error("Kubernetes reported error: {source}")]
@@ -447,17 +474,27 @@ pub mod mock_error {
}
}
-impl<T: ApplyApi, U: AgentMethod> ProxyController<T, U> {
- pub fn test() -> (ProxyController<impl ApplyApi, impl AgentMethod>, ApiServerVerifier) {
+mock! {
+ pub AgentCallClient{}
+ impl AgentCall for AgentCallClient{
+ fn call_agent<T: RpcMethod + 'static>(&self, client:&Client, method: T) -> Result<(), agent_error::Error> {
+ Ok(())
+ }
+ }
+
+}
+impl<T: ApplyApi, U: AgentCall> ProxyController<T, U> {
+ pub fn test() -> (ProxyController<ControllerClient, MockAgentCallClient>, ApiServerVerifier) {
let (mock_service, handle) = tower_test::mock::pair::<Request<Body>, Response<Body>>();
- let mock_k8s_client = Client::new(mock_service, "default");
+ let mock_k8s_client = KubeClient::new(mock_service, "default");
let mock_api_client = ControllerClient::new(mock_k8s_client.clone());
- let mut mock_agent_client: MockAgentMethod = MockAgentMethod::new();
- mock_agent_client.expect_rollback_method().returning(|_x| Ok(()));
- mock_agent_client.expect_prepare_upgrade_method().returning(|_x, _y| Ok(()));
- mock_agent_client.expect_upgrade_method().returning(|_x| Ok(()));
- mock_agent_client.expect_configure_method().returning(|_x, _y| Ok(()));
- let proxy_controller: ProxyController<ControllerClient, MockAgentMethod> =
+ let mut mock_agent_call_client = MockAgentCallClient::new();
+ mock_agent_call_client.expect_call_agent::<UpgradeMethod>().returning(|_x, _y| Ok(()));
+ mock_agent_call_client.expect_call_agent::<PrepareUpgradeMethod>().returning(|_x, _y| Ok(()));
+ mock_agent_call_client.expect_call_agent::<RollbackMethod>().returning(|_x, _y| Ok(()));
+ mock_agent_call_client.expect_call_agent::<ConfigureMethod>().returning(|_x, _y| Ok(()));
+ let mock_agent_client = AgentClient::new("test", mock_agent_call_client);
+ let proxy_controller: ProxyController<ControllerClient, MockAgentCallClient> =
ProxyController::new(mock_k8s_client, mock_api_client, mock_agent_client);
(proxy_controller, ApiServerVerifier(handle))
}
@@ -495,7 +532,7 @@ impl OSInstance {
}
pub fn set_osi_nodestatus_config(node_name: &str, namespace: &str) -> Self {
- // return osinstance with nodestatus = upgrade, upgradeconfig.version=v1, sysconfig.version=v1
+ // return osinstance with nodestatus = config, upgradeconfig.version=v1, sysconfig.version=v1
let mut osinstance = OSInstance::set_osi_default(node_name, namespace);
osinstance.spec.nodestatus = NODE_STATUS_CONFIG.to_string();
osinstance
@@ -512,7 +549,18 @@ impl OSInstance {
// return osinstance with nodestatus = upgrade, upgradeconfig.version=v2, sysconfig.version=v1
let mut osinstance = OSInstance::set_osi_default(node_name, namespace);
osinstance.spec.nodestatus = NODE_STATUS_UPGRADE.to_string();
- osinstance.spec.upgradeconfigs.as_mut().unwrap().version = Some(String::from("v2"));
+ osinstance.spec.upgradeconfigs = Some(Configs {
+ version: Some(String::from("v2")),
+ configs: Some(vec![Config {
+ model: Some(String::from("kernel.sysctl.persist")),
+ configpath: Some(String::from("/persist/persist.conf")),
+ contents: Some(vec![Content {
+ key: Some(String::from("kernel.test")),
+ value: Some(String::from("test")),
+ operation: Some(String::from("delete")),
+ }]),
+ }]),
+ });
osinstance
}
@@ -520,7 +568,18 @@ impl OSInstance {
// return osinstance with nodestatus = upgrade, upgradeconfig.version=v2, sysconfig.version=v1
let mut osinstance = OSInstance::set_osi_default(node_name, namespace);
osinstance.spec.nodestatus = NODE_STATUS_CONFIG.to_string();
- osinstance.spec.sysconfigs.as_mut().unwrap().version = Some(String::from("v2"));
+ osinstance.spec.sysconfigs = Some(Configs {
+ version: Some(String::from("v2")),
+ configs: Some(vec![Config {
+ model: Some(String::from("kernel.sysctl.persist")),
+ configpath: Some(String::from("/persist/persist.conf")),
+ contents: Some(vec![Content {
+ key: Some(String::from("kernel.test")),
+ value: Some(String::from("test")),
+ operation: Some(String::from("delete")),
+ }]),
+ }]),
+ });
osinstance
}
}
@@ -549,7 +608,37 @@ impl OS {
pub fn set_os_syscon_v2_opstype_config() -> Self {
let mut os = OS::set_os_default();
os.spec.opstype = String::from("config");
- os.spec.sysconfigs = Some(Configs { version: Some(String::from("v2")), configs: None });
+ os.spec.sysconfigs = Some(Configs {
+ version: Some(String::from("v2")),
+ configs: Some(vec![Config {
+ model: Some(String::from("kernel.sysctl.persist")),
+ configpath: Some(String::from("/persist/persist.conf")),
+ contents: Some(vec![Content {
+ key: Some(String::from("kernel.test")),
+ value: Some(String::from("test")),
+ operation: Some(String::from("delete")),
+ }]),
+ }]),
+ });
+ os
+ }
+
+ pub fn set_os_rollback_osversion_v2_upgradecon_v2() -> Self {
+ let mut os = OS::set_os_default();
+ os.spec.osversion = String::from("KubeOS v2");
+ os.spec.opstype = OPERATION_TYPE_ROLLBACK.to_string();
+ os.spec.upgradeconfigs = Some(Configs {
+ version: Some(String::from("v2")),
+ configs: Some(vec![Config {
+ model: Some(String::from("kernel.sysctl.persist")),
+ configpath: Some(String::from("/persist/persist.conf")),
+ contents: Some(vec![Content {
+ key: Some(String::from("kernel.test")),
+ value: Some(String::from("test")),
+ operation: Some(String::from("delete")),
+ }]),
+ }]),
+ });
os
}
}
diff --git a/KubeOS-Rust/proxy/src/controller/controller.rs b/KubeOS-Rust/proxy/src/controller/controller.rs
index b2bb332..c21f304 100644
--- a/KubeOS-Rust/proxy/src/controller/controller.rs
+++ b/KubeOS-Rust/proxy/src/controller/controller.rs
@@ -24,10 +24,8 @@ use kube::{
use log::{debug, error, info};
use reconciler_error::Error;
-#[cfg_attr(test, double)]
-use super::agentclient::agent_call::AgentCallClient;
use super::{
- agentclient::{AgentMethod, ConfigInfo, KeyInfo, Sysconfig, UpgradeInfo},
+ agentclient::{AgentCall, AgentClient, AgentMethod, ConfigInfo, KeyInfo, Sysconfig, UpgradeInfo},
apiclient::ApplyApi,
crd::{Configs, Content, OSInstance, OS},
utils::{check_version, get_config_version, ConfigOperation, ConfigType},
@@ -36,12 +34,10 @@ use super::{
REQUEUE_ERROR, REQUEUE_NORMAL,
},
};
-#[cfg(test)]
-use mockall_double::double;
-pub async fn reconcile(
+pub async fn reconcile<T: ApplyApi, U: AgentCall>(
os: OS,
- ctx: Context<ProxyController<impl ApplyApi, impl AgentMethod>>,
+ ctx: Context<ProxyController<T, U>>,
) -> Result<ReconcilerAction, Error> {
debug!("start reconcile");
let proxy_controller = ctx.get_ref();
@@ -76,7 +72,7 @@ pub async fn reconcile(
)
.await?;
return Ok(REQUEUE_NORMAL);
- },
+ }
ConfigOperation::UpdateConfig => {
debug!("start update config");
osinstance.spec.sysconfigs = os_cr.spec.sysconfigs.clone();
@@ -85,8 +81,8 @@ pub async fn reconcile(
.update_osinstance_spec(&osinstance.name(), &namespace, &osinstance.spec)
.await?;
return Ok(REQUEUE_ERROR);
- },
- _ => {},
+ }
+ _ => {}
}
proxy_controller.set_config(&mut osinstance, ConfigType::SysConfig).await?;
proxy_controller
@@ -108,8 +104,8 @@ pub async fn reconcile(
)
.await?;
return Ok(REQUEUE_NORMAL);
- },
- _ => {},
+ }
+ _ => {}
}
if node.labels().contains_key(LABEL_UPGRADING) {
if osinstance.spec.nodestatus == NODE_STATUS_IDLE {
@@ -133,9 +129,9 @@ pub async fn reconcile(
Ok(REQUEUE_NORMAL)
}
-pub fn error_policy(
+pub fn error_policy<T: ApplyApi, U: AgentCall>(
error: &Error,
- _ctx: Context<ProxyController<impl ApplyApi, impl AgentMethod>>,
+ _ctx: Context<ProxyController<T, U>>,
) -> ReconcilerAction {
error!("Reconciliation error:{}", error.to_string());
REQUEUE_ERROR
@@ -145,31 +141,31 @@ struct ControllerResources {
osinstance: OSInstance,
node: Node,
}
-pub struct ProxyController<T: ApplyApi, U: AgentMethod> {
+pub struct ProxyController<T: ApplyApi, U: AgentCall> {
k8s_client: Client,
controller_client: T,
- agent_client: U,
+ agent_client: AgentClient<U>,
}
-impl<T: ApplyApi, U: AgentMethod> ProxyController<T, U> {
- pub fn new(k8s_client: Client, controller_client: T, agent_client: U) -> Self {
+impl<T: ApplyApi, U: AgentCall> ProxyController<T, U> {
+ pub fn new(k8s_client: Client, controller_client: T, agent_client: AgentClient<U>) -> Self {
ProxyController { k8s_client, controller_client, agent_client }
}
}
-impl<T: ApplyApi, U: AgentMethod> ProxyController<T, U> {
+impl<T: ApplyApi, U: AgentCall> ProxyController<T, U> {
async fn check_osi_exisit(&self, namespace: &str, node_name: &str) -> Result<(), Error> {
let osi_api: Api<OSInstance> = Api::namespaced(self.k8s_client.clone(), namespace);
match osi_api.get(node_name).await {
Ok(osi) => {
debug!("osinstance is exist {:?}", osi.name());
return Ok(());
- },
+ }
Err(kube::Error::Api(ErrorResponse { reason, .. })) if &reason == "NotFound" => {
info!("Create OSInstance {}", node_name);
self.controller_client.create_osinstance(node_name, namespace).await?;
Ok(())
- },
+ }
Err(err) => Err(Error::KubeError { source: err }),
}
}
@@ -257,17 +253,16 @@ impl<T: ApplyApi, U: AgentMethod> ProxyController<T, U> {
if config_info.need_config {
match config_info.configs.and_then(convert_to_agent_config) {
Some(agent_configs) => {
- let agent_call_client = AgentCallClient::default();
- match self.agent_client.configure_method(ConfigInfo { configs: agent_configs }, agent_call_client) {
- Ok(_resp) => {},
+ match self.agent_client.configure_method(ConfigInfo { configs: agent_configs }) {
+ Ok(_resp) => {}
Err(e) => {
return Err(Error::AgentError { source: e });
- },
+ }
}
- },
+ }
None => {
- info!("config is none, no need to config");
- },
+ info!("config is none, No content can be configured.");
+ }
};
self.update_osi_status(osinstance, config_type).await?;
}
@@ -290,35 +285,34 @@ impl<T: ApplyApi, U: AgentMethod> ProxyController<T, U> {
clientcert: os_cr.spec.clientcert.clone().unwrap_or_default(),
clientkey: os_cr.spec.clientkey.clone().unwrap_or_default(),
};
- let agent_call_client = AgentCallClient::default();
- match self.agent_client.prepare_upgrade_method(upgrade_info, agent_call_client) {
- Ok(_resp) => {},
+
+ match self.agent_client.prepare_upgrade_method(upgrade_info) {
+ Ok(_resp) => {}
Err(e) => {
return Err(Error::AgentError { source: e });
- },
+ }
}
self.evict_node(&node.name(), os_cr.spec.evictpodforce).await?;
- let agent_call_client = AgentCallClient::default();
- match self.agent_client.upgrade_method(agent_call_client) {
- Ok(_resp) => {},
+ match self.agent_client.upgrade_method() {
+ Ok(_resp) => {}
Err(e) => {
return Err(Error::AgentError { source: e });
- },
+ }
}
- },
+ }
OPERATION_TYPE_ROLLBACK => {
self.evict_node(&node.name(), os_cr.spec.evictpodforce).await?;
- let agent_call_client = AgentCallClient::default();
- match self.agent_client.rollback_method(agent_call_client) {
- Ok(_resp) => {},
+
+ match self.agent_client.rollback_method() {
+ Ok(_resp) => {}
Err(e) => {
return Err(Error::AgentError { source: e });
- },
+ }
}
- },
+ }
_ => {
return Err(Error::OperationError { value: os_cr.spec.opstype.clone() });
- },
+ }
}
Ok(())
}
@@ -329,12 +323,12 @@ impl<T: ApplyApi, U: AgentMethod> ProxyController<T, U> {
node_api.cordon(node_name).await?;
info!("Cordon node Successfully{}, start drain nodes", node_name);
match self.drain_node(node_name, evict_pod_force).await {
- Ok(()) => {},
+ Ok(()) => {}
Err(e) => {
node_api.uncordon(node_name).await?;
info!("Drain node {} error, uncordon node successfully", node_name);
return Err(e);
- },
+ }
}
Ok(())
}
@@ -360,7 +354,7 @@ fn convert_to_agent_config(configs: Configs) -> Option<Vec<Sysconfig>> {
contents: contents_tmp,
};
agent_configs.push(config_tmp)
- },
+ }
None => {
info!(
"model {} which has configpath {} do not has any contents no need to configure",
@@ -368,7 +362,7 @@ fn convert_to_agent_config(configs: Configs) -> Option<Vec<Sysconfig>> {
config.configpath.unwrap_or_default()
);
continue;
- },
+ }
};
}
if agent_configs.len() == 0 {
@@ -437,16 +431,13 @@ pub mod reconciler_error {
#[cfg(test)]
mod test {
use super::{error_policy, reconcile, Context, OSInstance, ProxyController, OS};
+ use crate::controller::apiserver_mock::{timeout_after_5s, MockAgentCallClient, Testcases};
use crate::controller::ControllerClient;
- use crate::controller::{
- agentclient::MockAgentMethod,
- apiserver_mock::{timeout_after_5s, Testcases},
- };
use std::env;
#[tokio::test]
async fn test_create_osinstance_with_no_upgrade_or_configuration() {
- let (test_proxy_controller, fakeserver) = ProxyController::<ControllerClient, MockAgentMethod>::test();
+ let (test_proxy_controller, fakeserver) = ProxyController::<ControllerClient, MockAgentCallClient>::test();
env::set_var("NODE_NAME", "openeuler");
let os = OS::set_os_default();
let context = Context::new(test_proxy_controller);
@@ -457,7 +448,7 @@ mod test {
}
#[tokio::test]
async fn test_upgrade_normal() {
- let (test_proxy_controller, fakeserver) = ProxyController::<ControllerClient, MockAgentMethod>::test();
+ let (test_proxy_controller, fakeserver) = ProxyController::<ControllerClient, MockAgentCallClient>::test();
env::set_var("NODE_NAME", "openeuler");
let os = OS::set_os_osversion_v2_upgradecon_v2();
let context = Context::new(test_proxy_controller);
@@ -471,7 +462,7 @@ mod test {
#[tokio::test]
async fn test_diff_osversion_opstype_config() {
- let (test_proxy_controller, fakeserver) = ProxyController::<ControllerClient, MockAgentMethod>::test();
+ let (test_proxy_controller, fakeserver) = ProxyController::<ControllerClient, MockAgentCallClient>::test();
env::set_var("NODE_NAME", "openeuler");
let os = OS::set_os_osversion_v2_opstype_config();
let context = Context::new(test_proxy_controller);
@@ -488,7 +479,7 @@ mod test {
#[tokio::test]
async fn test_upgradeconfigs_version_mismatch() {
- let (test_proxy_controller, fakeserver) = ProxyController::<ControllerClient, MockAgentMethod>::test();
+ let (test_proxy_controller, fakeserver) = ProxyController::<ControllerClient, MockAgentCallClient>::test();
env::set_var("NODE_NAME", "openeuler");
let os = OS::set_os_osversion_v2_upgradecon_v2();
let context = Context::new(test_proxy_controller);
@@ -501,7 +492,7 @@ mod test {
#[tokio::test]
async fn test_upgrade_nodestatus_idle() {
- let (test_proxy_controller, fakeserver) = ProxyController::<ControllerClient, MockAgentMethod>::test();
+ let (test_proxy_controller, fakeserver) = ProxyController::<ControllerClient, MockAgentCallClient>::test();
env::set_var("NODE_NAME", "openeuler");
let os = OS::set_os_osversion_v2_upgradecon_v2();
let context = Context::new(test_proxy_controller);
@@ -513,7 +504,7 @@ mod test {
#[tokio::test]
async fn test_config_normal() {
- let (test_proxy_controller, fakeserver) = ProxyController::<ControllerClient, MockAgentMethod>::test();
+ let (test_proxy_controller, fakeserver) = ProxyController::<ControllerClient, MockAgentCallClient>::test();
env::set_var("NODE_NAME", "openeuler");
let os = OS::set_os_syscon_v2_opstype_config();
let context = Context::new(test_proxy_controller);
@@ -525,7 +516,7 @@ mod test {
#[tokio::test]
async fn test_sysconfig_version_mismatch_reassign() {
- let (test_proxy_controller, fakeserver) = ProxyController::<ControllerClient, MockAgentMethod>::test();
+ let (test_proxy_controller, fakeserver) = ProxyController::<ControllerClient, MockAgentCallClient>::test();
env::set_var("NODE_NAME", "openeuler");
let os = OS::set_os_syscon_v2_opstype_config();
let context = Context::new(test_proxy_controller);
@@ -539,7 +530,7 @@ mod test {
#[tokio::test]
async fn test_sysconfig_version_mismatch_update() {
- let (test_proxy_controller, fakeserver) = ProxyController::<ControllerClient, MockAgentMethod>::test();
+ let (test_proxy_controller, fakeserver) = ProxyController::<ControllerClient, MockAgentCallClient>::test();
env::set_var("NODE_NAME", "openeuler");
let os = OS::set_os_syscon_v2_opstype_config();
let context = Context::new(test_proxy_controller);
@@ -550,4 +541,16 @@ mod test {
reconcile(os, context.clone()).await.expect("reconciler");
timeout_after_5s(mocksrv).await;
}
+
+ #[tokio::test]
+ async fn test_rollback() {
+ let (test_proxy_controller, fakeserver) = ProxyController::<ControllerClient, MockAgentCallClient>::test();
+ env::set_var("NODE_NAME", "openeuler");
+ let os = OS::set_os_rollback_osversion_v2_upgradecon_v2();
+ let context = Context::new(test_proxy_controller);
+ let mocksrv = fakeserver
+ .run(Testcases::Rollback(OSInstance::set_osi_nodestatus_upgrade_upgradecon_v2("openeuler", "default")));
+ reconcile(os, context.clone()).await.expect("reconciler");
+ timeout_after_5s(mocksrv).await;
+ }
}
diff --git a/KubeOS-Rust/proxy/src/controller/mod.rs b/KubeOS-Rust/proxy/src/controller/mod.rs
index 73be45c..e30c8df 100644
--- a/KubeOS-Rust/proxy/src/controller/mod.rs
+++ b/KubeOS-Rust/proxy/src/controller/mod.rs
@@ -19,7 +19,7 @@ mod crd;
mod utils;
mod values;
-pub use agentclient::AgentClient;
+pub use agentclient::{AgentCallClient, AgentClient};
pub use apiclient::ControllerClient;
pub use controller::{error_policy, reconcile, reconciler_error::Error, ProxyController};
pub use crd::OS;
diff --git a/KubeOS-Rust/proxy/src/controller/utils.rs b/KubeOS-Rust/proxy/src/controller/utils.rs
index 78502db..0f56878 100644
--- a/KubeOS-Rust/proxy/src/controller/utils.rs
+++ b/KubeOS-Rust/proxy/src/controller/utils.rs
@@ -56,7 +56,7 @@ impl ConfigType {
);
return ConfigOperation::Reassign;
}
- },
+ }
ConfigType::SysConfig => {
let os_config_version = get_config_version(os.spec.sysconfigs.as_ref());
let osi_config_version = get_config_version(osinstance.spec.sysconfigs.as_ref());
@@ -78,7 +78,7 @@ impl ConfigType {
return ConfigOperation::UpdateConfig;
}
}
- },
+ }
};
ConfigOperation::DoNothing
}
@@ -96,7 +96,7 @@ impl ConfigType {
status_config_version = get_config_version(None);
}
configs = osinstance.spec.upgradeconfigs.clone();
- },
+ }
ConfigType::SysConfig => {
spec_config_version = get_config_version(osinstance.spec.sysconfigs.as_ref());
if let Some(osinstance_status) = osinstance.status.as_ref() {
@@ -105,7 +105,7 @@ impl ConfigType {
status_config_version = get_config_version(None);
}
configs = osinstance.spec.sysconfigs.clone();
- },
+ }
}
debug!(
"=======osinstance soec config version is {},status config version is {}",
@@ -127,7 +127,7 @@ impl ConfigType {
sysconfigs: None,
})
}
- },
+ }
ConfigType::SysConfig => {
if let Some(osi_status) = &mut osinstance.status {
osi_status.sysconfigs = osinstance.spec.sysconfigs.clone();
@@ -135,7 +135,7 @@ impl ConfigType {
osinstance.status =
Some(OSInstanceStatus { upgradeconfigs: None, sysconfigs: osinstance.spec.sysconfigs.clone() })
}
- },
+ }
}
}
}
diff --git a/KubeOS-Rust/proxy/src/drain.rs b/KubeOS-Rust/proxy/src/drain.rs
index 09cf662..72836f9 100644
--- a/KubeOS-Rust/proxy/src/drain.rs
+++ b/KubeOS-Rust/proxy/src/drain.rs
@@ -66,7 +66,7 @@ async fn get_pods_deleted(
Ok(pods @ ObjectList { .. }) => pods,
Err(err) => {
return Err(GetPodListsError { source: err, node_name: node_name.to_string() });
- },
+ }
};
let mut filterd_pods_list: Vec<Pod> = Vec::new();
let mut filterd_err: Vec<String> = Vec::new();
@@ -189,14 +189,14 @@ async fn wait_for_deletion(k8s_client: &kube::Client, pod: &Pod) -> Result<(), e
let name = (&p).name_any();
info!("Pod {} deleted.", name);
break;
- },
+ }
Ok(_) => {
info!("Pod '{}' is not yet deleted. Waiting {}s.", pod.name_any(), EVERY_DELETION_CHECK.as_secs_f64());
- },
+ }
Err(kube::Error::Api(e)) if e.code == response_error_not_found => {
info!("Pod {} is deleted.", pod.name_any());
break;
- },
+ }
Err(e) => {
error!(
"Get pod {} reported error: '{}', whether pod is deleted cannot be determined, waiting {}s.",
@@ -204,7 +204,7 @@ async fn wait_for_deletion(k8s_client: &kube::Client, pod: &Pod) -> Result<(), e
e,
EVERY_DELETION_CHECK.as_secs_f64()
);
- },
+ }
}
if start_time.elapsed() > TIMEOUT {
return Err(WaitDeletionError { pod_name: pod.name_any(), max_wait: TIMEOUT });
@@ -241,7 +241,7 @@ impl PodFilter for FinishedOrFailedFilter {
return match pod.status.as_ref() {
Some(PodStatus { phase: Some(phase), .. }) if phase == "Failed" || phase == "Succeeded" => {
FilterResult::create_filter_result(true, "", PodDeleteStatus::Okay)
- },
+ }
_ => FilterResult::create_filter_result(false, "", PodDeleteStatus::Okay),
};
}
@@ -269,7 +269,7 @@ impl PodFilter for DaemonFilter {
let description = format!("Cannot drain Pod '{}': Pod is member of a DaemonSet", pod.name_any());
Box::new(FilterResult { result: false, desc: description, status: PodDeleteStatus::Error })
}
- },
+ }
_ => FilterResult::create_filter_result(true, "", PodDeleteStatus::Okay),
};
}
@@ -287,7 +287,7 @@ impl PodFilter for MirrorFilter {
Some(annotations) if annotations.contains_key("kubernetes.io/config.mirror") => {
let description = format!("Ignore Pod '{}': Pod is a static Mirror Pod", pod.name_any());
FilterResult::create_filter_result(false, &description.to_string(), PodDeleteStatus::Warning)
- },
+ }
_ => FilterResult::create_filter_result(true, "", PodDeleteStatus::Okay),
};
}
@@ -312,7 +312,7 @@ impl PodFilter for LocalStorageFilter {
let description = format!("Cannot drain Pod '{}': Pod has local Storage", pod.name_any());
Box::new(FilterResult { result: false, desc: description, status: PodDeleteStatus::Error })
}
- },
+ }
_ => FilterResult::create_filter_result(true, "", PodDeleteStatus::Okay),
};
}
@@ -365,7 +365,7 @@ impl PodFilter for DeletedFilter {
&& now - Duration::from_secs(time.0.timestamp() as u64) >= self.delete_wait_timeout =>
{
FilterResult::create_filter_result(true, "", PodDeleteStatus::Okay)
- },
+ }
_ => FilterResult::create_filter_result(true, "", PodDeleteStatus::Okay),
};
}
@@ -471,7 +471,7 @@ impl ErrorHandleStrategy {
return match self {
Self::TolerateStrategy => {
return backoff.take(0);
- },
+ }
Self::RetryStrategy => backoff.take(MAX_RETRIES_TIMES),
};
@@ -488,7 +488,7 @@ impl tokio_retry::Condition<error::EvictionError> for ErrorHandleStrategy {
} else {
false
}
- },
+ }
}
}
}
diff --git a/KubeOS-Rust/proxy/src/main.rs b/KubeOS-Rust/proxy/src/main.rs
index cd601d0..ad36b64 100644
--- a/KubeOS-Rust/proxy/src/main.rs
+++ b/KubeOS-Rust/proxy/src/main.rs
@@ -20,7 +20,9 @@ use kube::{
};
use log::{error, info};
mod controller;
-use controller::{error_policy, reconcile, AgentClient, ControllerClient, ProxyController, OS, SOCK_PATH};
+use controller::{
+ error_policy, reconcile, AgentCallClient, AgentClient, ControllerClient, ProxyController, OS, SOCK_PATH,
+};
const PROXY_VERSION: Option<&'static str> = option_env!("CARGO_PKG_VERSION");
#[tokio::main]
@@ -29,14 +31,15 @@ async fn main() -> Result<()> {
let client = Client::try_default().await?;
let os: Api<OS> = Api::all(client.clone());
let controller_client = ControllerClient::new(client.clone());
- let agent_client = AgentClient::new(SOCK_PATH);
+ let agent_call_client = AgentCallClient::default();
+ let agent_client = AgentClient::new(SOCK_PATH, agent_call_client);
let proxy_controller = ProxyController::new(client, controller_client, agent_client);
info!("os-proxy version is {}, start renconcile", PROXY_VERSION.unwrap_or("Not Found"));
Controller::new(os, ListParams::default())
.run(reconcile, error_policy, Context::new(proxy_controller))
.for_each(|res| async move {
match res {
- Ok(_o) => {},
+ Ok(_o) => {}
Err(e) => error!("reconcile failed: {}", e.to_string()),
}
})
--
2.34.1