From 2e05a5ce397b1ad3b889e200ee278f9efb5276f8 Mon Sep 17 00:00:00 2001 From: liyuanr Date: Tue, 30 Jan 2024 15:04:13 +0800 Subject: [PATCH 2/4] proxy: Add unit tests and delete useless dependencies from Cargo.toml Add the proxy unit test ,modify the AgentCallClient , AgentClient and ProxyController struct design to make it easier to mock the agent invoking. Delete useless dependencies from Cargo.toml and Cargo.lock Signed-off-by: liyuanr --- KubeOS-Rust/Cargo.lock | 14 -- KubeOS-Rust/proxy/Cargo.toml | 3 +- .../proxy/src/controller/agentclient.rs | 74 ++++------ .../proxy/src/controller/apiserver_mock.rs | 137 +++++++++++++++--- .../proxy/src/controller/controller.rs | 121 ++++++++-------- KubeOS-Rust/proxy/src/controller/mod.rs | 2 +- KubeOS-Rust/proxy/src/controller/utils.rs | 12 +- KubeOS-Rust/proxy/src/drain.rs | 24 +-- KubeOS-Rust/proxy/src/main.rs | 9 +- 9 files changed, 232 insertions(+), 164 deletions(-) diff --git a/KubeOS-Rust/Cargo.lock b/KubeOS-Rust/Cargo.lock index c44c152..2342c7b 100644 --- a/KubeOS-Rust/Cargo.lock +++ b/KubeOS-Rust/Cargo.lock @@ -1180,18 +1180,6 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "mockall_double" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7dffc15b97456ecc84d2bde8c1df79145e154f45225828c4361f676e1b82acd6" -dependencies = [ - "cfg-if", - "proc-macro2", - "quote", - "syn 1.0.109", -] - [[package]] name = "mockito" version = "0.31.1" @@ -1503,7 +1491,6 @@ dependencies = [ "anyhow", "assert-json-diff", "async-trait", - "chrono", "cli", "env_logger", "futures", @@ -1515,7 +1502,6 @@ dependencies = [ "log", "manager", "mockall", - "mockall_double", "regex", "reqwest", "schemars", diff --git a/KubeOS-Rust/proxy/Cargo.toml b/KubeOS-Rust/proxy/Cargo.toml index fe657af..94e3b3c 100644 --- a/KubeOS-Rust/proxy/Cargo.toml +++ b/KubeOS-Rust/proxy/Cargo.toml @@ -17,7 +17,6 @@ path = "src/main.rs" [dependencies] anyhow = "1.0.44" async-trait = "0.1" -chrono = { version = "0.4", default-features = false, features = ["std"] } cli = { version = "1.0.5", path = "../cli" } env_logger = "0.9.0" futures = "0.3.17" @@ -45,4 +44,4 @@ http = "0.2.9" hyper = "0.14.25" tower-test = "0.4.0" mockall = { version = "=0.11.3" } -mockall_double = "0.2.1" + diff --git a/KubeOS-Rust/proxy/src/controller/agentclient.rs b/KubeOS-Rust/proxy/src/controller/agentclient.rs index 73489a9..b833f27 100644 --- a/KubeOS-Rust/proxy/src/controller/agentclient.rs +++ b/KubeOS-Rust/proxy/src/controller/agentclient.rs @@ -22,13 +22,6 @@ use cli::{ }; use manager::api::{CertsInfo, ConfigureRequest, KeyInfo as AgentKeyInfo, Sysconfig as AgentSysconfig, UpgradeRequest}; -#[cfg_attr(test, double)] -use agent_call::AgentCallClient; -#[cfg(test)] -use mockall::automock; -#[cfg(test)] -use mockall_double::double; - pub struct UpgradeInfo { pub version: String, pub image_type: String, @@ -57,45 +50,40 @@ pub struct KeyInfo { pub operation: String, } -#[cfg_attr(test, automock)] pub trait AgentMethod { - fn prepare_upgrade_method(&self, upgrade_info: UpgradeInfo, agent_call: AgentCallClient) -> Result<(), Error>; - fn upgrade_method(&self, agent_call: AgentCallClient) -> Result<(), Error>; - fn rollback_method(&self, agent_call: AgentCallClient) -> Result<(), Error>; - fn configure_method(&self, config_info: ConfigInfo, agent_call: AgentCallClient) -> Result<(), Error>; + fn prepare_upgrade_method(&self, upgrade_info: UpgradeInfo) -> Result<(), Error>; + fn upgrade_method(&self) -> Result<(), Error>; + fn rollback_method(&self) -> Result<(), Error>; + fn configure_method(&self, config_info: ConfigInfo) -> Result<(), Error>; } - -pub mod agent_call { - use super::{Client, Error, RpcMethod}; - #[cfg(test)] - use mockall::automock; - - #[derive(Default)] - pub struct AgentCallClient {} - - #[cfg_attr(test, automock)] - impl AgentCallClient { - pub fn call_agent(&self, client: &Client, method: T) -> Result<(), Error> { - match method.call(client) { - Ok(_resp) => Ok(()), - Err(e) => Err(Error::AgentError { source: e }), - } - } - } +pub trait AgentCall { + fn call_agent(&self, client: &Client, method: T) -> Result<(), Error>; } -pub struct AgentClient { +pub struct AgentClient { pub agent_client: Client, + pub agent_call_client: T, +} + +impl AgentClient { + pub fn new>(socket_path: P, agent_call_client: T) -> Self { + AgentClient { agent_client: Client::new(socket_path), agent_call_client } + } } -impl AgentClient { - pub fn new>(socket_path: P) -> Self { - AgentClient { agent_client: Client::new(socket_path) } +#[derive(Default)] +pub struct AgentCallClient {} +impl AgentCall for AgentCallClient { + fn call_agent(&self, client: &Client, method: T) -> Result<(), Error> { + match method.call(client) { + Ok(_resp) => Ok(()), + Err(e) => Err(Error::AgentError { source: e }), + } } } -impl AgentMethod for AgentClient { - fn prepare_upgrade_method(&self, upgrade_info: UpgradeInfo, agent_call: AgentCallClient) -> Result<(), Error> { +impl AgentMethod for AgentClient { + fn prepare_upgrade_method(&self, upgrade_info: UpgradeInfo) -> Result<(), Error> { let upgrade_request = UpgradeRequest { version: upgrade_info.version, image_type: upgrade_info.image_type, @@ -110,27 +98,27 @@ impl AgentMethod for AgentClient { client_key: upgrade_info.clientkey, }, }; - match agent_call.call_agent(&self.agent_client, PrepareUpgradeMethod::new(upgrade_request)) { + match self.agent_call_client.call_agent(&self.agent_client, PrepareUpgradeMethod::new(upgrade_request)) { Ok(_resp) => Ok(()), Err(e) => Err(e), } } - fn upgrade_method(&self, agent_call: AgentCallClient) -> Result<(), Error> { - match agent_call.call_agent(&self.agent_client, UpgradeMethod::default()) { + fn upgrade_method(&self) -> Result<(), Error> { + match self.agent_call_client.call_agent(&self.agent_client, UpgradeMethod::default()) { Ok(_resp) => Ok(()), Err(e) => Err(e), } } - fn rollback_method(&self, agent_call: AgentCallClient) -> Result<(), Error> { - match agent_call.call_agent(&self.agent_client, RollbackMethod::default()) { + fn rollback_method(&self) -> Result<(), Error> { + match self.agent_call_client.call_agent(&self.agent_client, RollbackMethod::default()) { Ok(_resp) => Ok(()), Err(e) => Err(e), } } - fn configure_method(&self, config_info: ConfigInfo, agent_call: AgentCallClient) -> Result<(), Error> { + fn configure_method(&self, config_info: ConfigInfo) -> Result<(), Error> { let mut agent_configs: Vec = Vec::new(); for config in config_info.configs { let mut contents_tmp: HashMap = HashMap::new(); @@ -147,7 +135,7 @@ impl AgentMethod for AgentClient { }) } let config_request = ConfigureRequest { configs: agent_configs }; - match agent_call.call_agent(&self.agent_client, ConfigureMethod::new(config_request)) { + match self.agent_call_client.call_agent(&self.agent_client, ConfigureMethod::new(config_request)) { Ok(_resp) => Ok(()), Err(e) => Err(e), } diff --git a/KubeOS-Rust/proxy/src/controller/apiserver_mock.rs b/KubeOS-Rust/proxy/src/controller/apiserver_mock.rs index c46d26a..ef5977c 100644 --- a/KubeOS-Rust/proxy/src/controller/apiserver_mock.rs +++ b/KubeOS-Rust/proxy/src/controller/apiserver_mock.rs @@ -2,15 +2,20 @@ use self::mock_error::Error; use super::{ agentclient::*, crd::{Configs, OSInstanceStatus}, - values::{NODE_STATUS_CONFIG, NODE_STATUS_UPGRADE}, + values::{NODE_STATUS_CONFIG, NODE_STATUS_UPGRADE, OPERATION_TYPE_ROLLBACK}, }; use crate::controller::{ apiclient::{ApplyApi, ControllerClient}, - crd::{OSInstance, OSInstanceSpec, OSSpec, OS}, + crd::{Config, Content, OSInstance, OSInstanceSpec, OSSpec, OS}, values::{LABEL_OSINSTANCE, LABEL_UPGRADING, NODE_STATUS_IDLE}, ProxyController, }; use anyhow::Result; +use cli::client::Client; +use cli::method::{ + callable_method::RpcMethod, configure::ConfigureMethod, prepare_upgrade::PrepareUpgradeMethod, + rollback::RollbackMethod, upgrade::UpgradeMethod, +}; use http::{Request, Response}; use hyper::{body::to_bytes, Body}; use k8s_openapi::api::core::v1::Pod; @@ -19,7 +24,8 @@ use kube::{ api::ObjectMeta, core::{ListMeta, ObjectList}, }; -use kube::{Client, Resource, ResourceExt}; +use kube::{Client as KubeClient, Resource, ResourceExt}; +use mockall::mock; use std::collections::BTreeMap; type ApiServerHandle = tower_test::mock::Handle, Response>; @@ -34,6 +40,7 @@ pub enum Testcases { ConfigNormal(OSInstance), ConfigVersionMismatchReassign(OSInstance), ConfigVersionMismatchUpdate(OSInstance), + Rollback(OSInstance), } pub async fn timeout_after_5s(handle: tokio::task::JoinHandle<()>) { @@ -59,7 +66,7 @@ impl ApiServerVerifier { .unwrap() .handler_node_get(osi) .await - }, + } Testcases::UpgradeNormal(osi) => { self.handler_osinstance_get_exist(osi.clone()) .await @@ -78,7 +85,7 @@ impl ApiServerVerifier { .unwrap() .handler_node_pod_list_get(osi) .await - }, + } Testcases::UpgradeUpgradeconfigsVersionMismatch(osi) => { self.handler_osinstance_get_exist(osi.clone()) .await @@ -97,7 +104,7 @@ impl ApiServerVerifier { .unwrap() .handler_osinstance_patch_nodestatus_idle(osi) .await - }, + } Testcases::UpgradeOSInstaceNodestatusConfig(osi) => { self.handler_osinstance_get_exist(osi.clone()) .await @@ -107,7 +114,7 @@ impl ApiServerVerifier { .unwrap() .handler_node_get_with_label(osi.clone()) .await - }, + } Testcases::UpgradeOSInstaceNodestatusIdle(osi) => { self.handler_osinstance_get_exist(osi.clone()) .await @@ -123,7 +130,7 @@ impl ApiServerVerifier { .unwrap() .handler_node_uncordon(osi) .await - }, + } Testcases::ConfigNormal(osi) => { self.handler_osinstance_get_exist(osi.clone()) .await @@ -139,7 +146,7 @@ impl ApiServerVerifier { .unwrap() .handler_osinstance_patch_nodestatus_idle(osi) .await - }, + } Testcases::ConfigVersionMismatchReassign(osi) => { self.handler_osinstance_get_exist(osi.clone()) .await @@ -152,7 +159,7 @@ impl ApiServerVerifier { .unwrap() .handler_osinstance_patch_nodestatus_idle(osi) .await - }, + } Testcases::ConfigVersionMismatchUpdate(osi) => { self.handler_osinstance_get_exist(osi.clone()) .await @@ -165,7 +172,26 @@ impl ApiServerVerifier { .unwrap() .handler_osinstance_patch_spec_sysconfig_v2(osi) .await - }, + } + Testcases::Rollback(osi) => { + self.handler_osinstance_get_exist(osi.clone()) + .await + .unwrap() + .handler_osinstance_get_exist(osi.clone()) + .await + .unwrap() + .handler_node_get_with_label(osi.clone()) + .await + .unwrap() + .handler_osinstance_patch_upgradeconfig_v2(osi.clone()) + .await + .unwrap() + .handler_node_cordon(osi.clone()) + .await + .unwrap() + .handler_node_pod_list_get(osi) + .await + } } .expect("Case completed without errors"); }) @@ -437,6 +463,7 @@ impl ApiServerVerifier { pub mod mock_error { use thiserror::Error; + #[derive(Error, Debug)] pub enum Error { #[error("Kubernetes reported error: {source}")] @@ -447,17 +474,27 @@ pub mod mock_error { } } -impl ProxyController { - pub fn test() -> (ProxyController, ApiServerVerifier) { +mock! { + pub AgentCallClient{} + impl AgentCall for AgentCallClient{ + fn call_agent(&self, client:&Client, method: T) -> Result<(), agent_error::Error> { + Ok(()) + } + } + +} +impl ProxyController { + pub fn test() -> (ProxyController, ApiServerVerifier) { let (mock_service, handle) = tower_test::mock::pair::, Response>(); - let mock_k8s_client = Client::new(mock_service, "default"); + let mock_k8s_client = KubeClient::new(mock_service, "default"); let mock_api_client = ControllerClient::new(mock_k8s_client.clone()); - let mut mock_agent_client: MockAgentMethod = MockAgentMethod::new(); - mock_agent_client.expect_rollback_method().returning(|_x| Ok(())); - mock_agent_client.expect_prepare_upgrade_method().returning(|_x, _y| Ok(())); - mock_agent_client.expect_upgrade_method().returning(|_x| Ok(())); - mock_agent_client.expect_configure_method().returning(|_x, _y| Ok(())); - let proxy_controller: ProxyController = + let mut mock_agent_call_client = MockAgentCallClient::new(); + mock_agent_call_client.expect_call_agent::().returning(|_x, _y| Ok(())); + mock_agent_call_client.expect_call_agent::().returning(|_x, _y| Ok(())); + mock_agent_call_client.expect_call_agent::().returning(|_x, _y| Ok(())); + mock_agent_call_client.expect_call_agent::().returning(|_x, _y| Ok(())); + let mock_agent_client = AgentClient::new("test", mock_agent_call_client); + let proxy_controller: ProxyController = ProxyController::new(mock_k8s_client, mock_api_client, mock_agent_client); (proxy_controller, ApiServerVerifier(handle)) } @@ -495,7 +532,7 @@ impl OSInstance { } pub fn set_osi_nodestatus_config(node_name: &str, namespace: &str) -> Self { - // return osinstance with nodestatus = upgrade, upgradeconfig.version=v1, sysconfig.version=v1 + // return osinstance with nodestatus = config, upgradeconfig.version=v1, sysconfig.version=v1 let mut osinstance = OSInstance::set_osi_default(node_name, namespace); osinstance.spec.nodestatus = NODE_STATUS_CONFIG.to_string(); osinstance @@ -512,7 +549,18 @@ impl OSInstance { // return osinstance with nodestatus = upgrade, upgradeconfig.version=v2, sysconfig.version=v1 let mut osinstance = OSInstance::set_osi_default(node_name, namespace); osinstance.spec.nodestatus = NODE_STATUS_UPGRADE.to_string(); - osinstance.spec.upgradeconfigs.as_mut().unwrap().version = Some(String::from("v2")); + osinstance.spec.upgradeconfigs = Some(Configs { + version: Some(String::from("v2")), + configs: Some(vec![Config { + model: Some(String::from("kernel.sysctl.persist")), + configpath: Some(String::from("/persist/persist.conf")), + contents: Some(vec![Content { + key: Some(String::from("kernel.test")), + value: Some(String::from("test")), + operation: Some(String::from("delete")), + }]), + }]), + }); osinstance } @@ -520,7 +568,18 @@ impl OSInstance { // return osinstance with nodestatus = upgrade, upgradeconfig.version=v2, sysconfig.version=v1 let mut osinstance = OSInstance::set_osi_default(node_name, namespace); osinstance.spec.nodestatus = NODE_STATUS_CONFIG.to_string(); - osinstance.spec.sysconfigs.as_mut().unwrap().version = Some(String::from("v2")); + osinstance.spec.sysconfigs = Some(Configs { + version: Some(String::from("v2")), + configs: Some(vec![Config { + model: Some(String::from("kernel.sysctl.persist")), + configpath: Some(String::from("/persist/persist.conf")), + contents: Some(vec![Content { + key: Some(String::from("kernel.test")), + value: Some(String::from("test")), + operation: Some(String::from("delete")), + }]), + }]), + }); osinstance } } @@ -549,7 +608,37 @@ impl OS { pub fn set_os_syscon_v2_opstype_config() -> Self { let mut os = OS::set_os_default(); os.spec.opstype = String::from("config"); - os.spec.sysconfigs = Some(Configs { version: Some(String::from("v2")), configs: None }); + os.spec.sysconfigs = Some(Configs { + version: Some(String::from("v2")), + configs: Some(vec![Config { + model: Some(String::from("kernel.sysctl.persist")), + configpath: Some(String::from("/persist/persist.conf")), + contents: Some(vec![Content { + key: Some(String::from("kernel.test")), + value: Some(String::from("test")), + operation: Some(String::from("delete")), + }]), + }]), + }); + os + } + + pub fn set_os_rollback_osversion_v2_upgradecon_v2() -> Self { + let mut os = OS::set_os_default(); + os.spec.osversion = String::from("KubeOS v2"); + os.spec.opstype = OPERATION_TYPE_ROLLBACK.to_string(); + os.spec.upgradeconfigs = Some(Configs { + version: Some(String::from("v2")), + configs: Some(vec![Config { + model: Some(String::from("kernel.sysctl.persist")), + configpath: Some(String::from("/persist/persist.conf")), + contents: Some(vec![Content { + key: Some(String::from("kernel.test")), + value: Some(String::from("test")), + operation: Some(String::from("delete")), + }]), + }]), + }); os } } diff --git a/KubeOS-Rust/proxy/src/controller/controller.rs b/KubeOS-Rust/proxy/src/controller/controller.rs index b2bb332..c21f304 100644 --- a/KubeOS-Rust/proxy/src/controller/controller.rs +++ b/KubeOS-Rust/proxy/src/controller/controller.rs @@ -24,10 +24,8 @@ use kube::{ use log::{debug, error, info}; use reconciler_error::Error; -#[cfg_attr(test, double)] -use super::agentclient::agent_call::AgentCallClient; use super::{ - agentclient::{AgentMethod, ConfigInfo, KeyInfo, Sysconfig, UpgradeInfo}, + agentclient::{AgentCall, AgentClient, AgentMethod, ConfigInfo, KeyInfo, Sysconfig, UpgradeInfo}, apiclient::ApplyApi, crd::{Configs, Content, OSInstance, OS}, utils::{check_version, get_config_version, ConfigOperation, ConfigType}, @@ -36,12 +34,10 @@ use super::{ REQUEUE_ERROR, REQUEUE_NORMAL, }, }; -#[cfg(test)] -use mockall_double::double; -pub async fn reconcile( +pub async fn reconcile( os: OS, - ctx: Context>, + ctx: Context>, ) -> Result { debug!("start reconcile"); let proxy_controller = ctx.get_ref(); @@ -76,7 +72,7 @@ pub async fn reconcile( ) .await?; return Ok(REQUEUE_NORMAL); - }, + } ConfigOperation::UpdateConfig => { debug!("start update config"); osinstance.spec.sysconfigs = os_cr.spec.sysconfigs.clone(); @@ -85,8 +81,8 @@ pub async fn reconcile( .update_osinstance_spec(&osinstance.name(), &namespace, &osinstance.spec) .await?; return Ok(REQUEUE_ERROR); - }, - _ => {}, + } + _ => {} } proxy_controller.set_config(&mut osinstance, ConfigType::SysConfig).await?; proxy_controller @@ -108,8 +104,8 @@ pub async fn reconcile( ) .await?; return Ok(REQUEUE_NORMAL); - }, - _ => {}, + } + _ => {} } if node.labels().contains_key(LABEL_UPGRADING) { if osinstance.spec.nodestatus == NODE_STATUS_IDLE { @@ -133,9 +129,9 @@ pub async fn reconcile( Ok(REQUEUE_NORMAL) } -pub fn error_policy( +pub fn error_policy( error: &Error, - _ctx: Context>, + _ctx: Context>, ) -> ReconcilerAction { error!("Reconciliation error:{}", error.to_string()); REQUEUE_ERROR @@ -145,31 +141,31 @@ struct ControllerResources { osinstance: OSInstance, node: Node, } -pub struct ProxyController { +pub struct ProxyController { k8s_client: Client, controller_client: T, - agent_client: U, + agent_client: AgentClient, } -impl ProxyController { - pub fn new(k8s_client: Client, controller_client: T, agent_client: U) -> Self { +impl ProxyController { + pub fn new(k8s_client: Client, controller_client: T, agent_client: AgentClient) -> Self { ProxyController { k8s_client, controller_client, agent_client } } } -impl ProxyController { +impl ProxyController { async fn check_osi_exisit(&self, namespace: &str, node_name: &str) -> Result<(), Error> { let osi_api: Api = Api::namespaced(self.k8s_client.clone(), namespace); match osi_api.get(node_name).await { Ok(osi) => { debug!("osinstance is exist {:?}", osi.name()); return Ok(()); - }, + } Err(kube::Error::Api(ErrorResponse { reason, .. })) if &reason == "NotFound" => { info!("Create OSInstance {}", node_name); self.controller_client.create_osinstance(node_name, namespace).await?; Ok(()) - }, + } Err(err) => Err(Error::KubeError { source: err }), } } @@ -257,17 +253,16 @@ impl ProxyController { if config_info.need_config { match config_info.configs.and_then(convert_to_agent_config) { Some(agent_configs) => { - let agent_call_client = AgentCallClient::default(); - match self.agent_client.configure_method(ConfigInfo { configs: agent_configs }, agent_call_client) { - Ok(_resp) => {}, + match self.agent_client.configure_method(ConfigInfo { configs: agent_configs }) { + Ok(_resp) => {} Err(e) => { return Err(Error::AgentError { source: e }); - }, + } } - }, + } None => { - info!("config is none, no need to config"); - }, + info!("config is none, No content can be configured."); + } }; self.update_osi_status(osinstance, config_type).await?; } @@ -290,35 +285,34 @@ impl ProxyController { clientcert: os_cr.spec.clientcert.clone().unwrap_or_default(), clientkey: os_cr.spec.clientkey.clone().unwrap_or_default(), }; - let agent_call_client = AgentCallClient::default(); - match self.agent_client.prepare_upgrade_method(upgrade_info, agent_call_client) { - Ok(_resp) => {}, + + match self.agent_client.prepare_upgrade_method(upgrade_info) { + Ok(_resp) => {} Err(e) => { return Err(Error::AgentError { source: e }); - }, + } } self.evict_node(&node.name(), os_cr.spec.evictpodforce).await?; - let agent_call_client = AgentCallClient::default(); - match self.agent_client.upgrade_method(agent_call_client) { - Ok(_resp) => {}, + match self.agent_client.upgrade_method() { + Ok(_resp) => {} Err(e) => { return Err(Error::AgentError { source: e }); - }, + } } - }, + } OPERATION_TYPE_ROLLBACK => { self.evict_node(&node.name(), os_cr.spec.evictpodforce).await?; - let agent_call_client = AgentCallClient::default(); - match self.agent_client.rollback_method(agent_call_client) { - Ok(_resp) => {}, + + match self.agent_client.rollback_method() { + Ok(_resp) => {} Err(e) => { return Err(Error::AgentError { source: e }); - }, + } } - }, + } _ => { return Err(Error::OperationError { value: os_cr.spec.opstype.clone() }); - }, + } } Ok(()) } @@ -329,12 +323,12 @@ impl ProxyController { node_api.cordon(node_name).await?; info!("Cordon node Successfully{}, start drain nodes", node_name); match self.drain_node(node_name, evict_pod_force).await { - Ok(()) => {}, + Ok(()) => {} Err(e) => { node_api.uncordon(node_name).await?; info!("Drain node {} error, uncordon node successfully", node_name); return Err(e); - }, + } } Ok(()) } @@ -360,7 +354,7 @@ fn convert_to_agent_config(configs: Configs) -> Option> { contents: contents_tmp, }; agent_configs.push(config_tmp) - }, + } None => { info!( "model {} which has configpath {} do not has any contents no need to configure", @@ -368,7 +362,7 @@ fn convert_to_agent_config(configs: Configs) -> Option> { config.configpath.unwrap_or_default() ); continue; - }, + } }; } if agent_configs.len() == 0 { @@ -437,16 +431,13 @@ pub mod reconciler_error { #[cfg(test)] mod test { use super::{error_policy, reconcile, Context, OSInstance, ProxyController, OS}; + use crate::controller::apiserver_mock::{timeout_after_5s, MockAgentCallClient, Testcases}; use crate::controller::ControllerClient; - use crate::controller::{ - agentclient::MockAgentMethod, - apiserver_mock::{timeout_after_5s, Testcases}, - }; use std::env; #[tokio::test] async fn test_create_osinstance_with_no_upgrade_or_configuration() { - let (test_proxy_controller, fakeserver) = ProxyController::::test(); + let (test_proxy_controller, fakeserver) = ProxyController::::test(); env::set_var("NODE_NAME", "openeuler"); let os = OS::set_os_default(); let context = Context::new(test_proxy_controller); @@ -457,7 +448,7 @@ mod test { } #[tokio::test] async fn test_upgrade_normal() { - let (test_proxy_controller, fakeserver) = ProxyController::::test(); + let (test_proxy_controller, fakeserver) = ProxyController::::test(); env::set_var("NODE_NAME", "openeuler"); let os = OS::set_os_osversion_v2_upgradecon_v2(); let context = Context::new(test_proxy_controller); @@ -471,7 +462,7 @@ mod test { #[tokio::test] async fn test_diff_osversion_opstype_config() { - let (test_proxy_controller, fakeserver) = ProxyController::::test(); + let (test_proxy_controller, fakeserver) = ProxyController::::test(); env::set_var("NODE_NAME", "openeuler"); let os = OS::set_os_osversion_v2_opstype_config(); let context = Context::new(test_proxy_controller); @@ -488,7 +479,7 @@ mod test { #[tokio::test] async fn test_upgradeconfigs_version_mismatch() { - let (test_proxy_controller, fakeserver) = ProxyController::::test(); + let (test_proxy_controller, fakeserver) = ProxyController::::test(); env::set_var("NODE_NAME", "openeuler"); let os = OS::set_os_osversion_v2_upgradecon_v2(); let context = Context::new(test_proxy_controller); @@ -501,7 +492,7 @@ mod test { #[tokio::test] async fn test_upgrade_nodestatus_idle() { - let (test_proxy_controller, fakeserver) = ProxyController::::test(); + let (test_proxy_controller, fakeserver) = ProxyController::::test(); env::set_var("NODE_NAME", "openeuler"); let os = OS::set_os_osversion_v2_upgradecon_v2(); let context = Context::new(test_proxy_controller); @@ -513,7 +504,7 @@ mod test { #[tokio::test] async fn test_config_normal() { - let (test_proxy_controller, fakeserver) = ProxyController::::test(); + let (test_proxy_controller, fakeserver) = ProxyController::::test(); env::set_var("NODE_NAME", "openeuler"); let os = OS::set_os_syscon_v2_opstype_config(); let context = Context::new(test_proxy_controller); @@ -525,7 +516,7 @@ mod test { #[tokio::test] async fn test_sysconfig_version_mismatch_reassign() { - let (test_proxy_controller, fakeserver) = ProxyController::::test(); + let (test_proxy_controller, fakeserver) = ProxyController::::test(); env::set_var("NODE_NAME", "openeuler"); let os = OS::set_os_syscon_v2_opstype_config(); let context = Context::new(test_proxy_controller); @@ -539,7 +530,7 @@ mod test { #[tokio::test] async fn test_sysconfig_version_mismatch_update() { - let (test_proxy_controller, fakeserver) = ProxyController::::test(); + let (test_proxy_controller, fakeserver) = ProxyController::::test(); env::set_var("NODE_NAME", "openeuler"); let os = OS::set_os_syscon_v2_opstype_config(); let context = Context::new(test_proxy_controller); @@ -550,4 +541,16 @@ mod test { reconcile(os, context.clone()).await.expect("reconciler"); timeout_after_5s(mocksrv).await; } + + #[tokio::test] + async fn test_rollback() { + let (test_proxy_controller, fakeserver) = ProxyController::::test(); + env::set_var("NODE_NAME", "openeuler"); + let os = OS::set_os_rollback_osversion_v2_upgradecon_v2(); + let context = Context::new(test_proxy_controller); + let mocksrv = fakeserver + .run(Testcases::Rollback(OSInstance::set_osi_nodestatus_upgrade_upgradecon_v2("openeuler", "default"))); + reconcile(os, context.clone()).await.expect("reconciler"); + timeout_after_5s(mocksrv).await; + } } diff --git a/KubeOS-Rust/proxy/src/controller/mod.rs b/KubeOS-Rust/proxy/src/controller/mod.rs index 73be45c..e30c8df 100644 --- a/KubeOS-Rust/proxy/src/controller/mod.rs +++ b/KubeOS-Rust/proxy/src/controller/mod.rs @@ -19,7 +19,7 @@ mod crd; mod utils; mod values; -pub use agentclient::AgentClient; +pub use agentclient::{AgentCallClient, AgentClient}; pub use apiclient::ControllerClient; pub use controller::{error_policy, reconcile, reconciler_error::Error, ProxyController}; pub use crd::OS; diff --git a/KubeOS-Rust/proxy/src/controller/utils.rs b/KubeOS-Rust/proxy/src/controller/utils.rs index 78502db..0f56878 100644 --- a/KubeOS-Rust/proxy/src/controller/utils.rs +++ b/KubeOS-Rust/proxy/src/controller/utils.rs @@ -56,7 +56,7 @@ impl ConfigType { ); return ConfigOperation::Reassign; } - }, + } ConfigType::SysConfig => { let os_config_version = get_config_version(os.spec.sysconfigs.as_ref()); let osi_config_version = get_config_version(osinstance.spec.sysconfigs.as_ref()); @@ -78,7 +78,7 @@ impl ConfigType { return ConfigOperation::UpdateConfig; } } - }, + } }; ConfigOperation::DoNothing } @@ -96,7 +96,7 @@ impl ConfigType { status_config_version = get_config_version(None); } configs = osinstance.spec.upgradeconfigs.clone(); - }, + } ConfigType::SysConfig => { spec_config_version = get_config_version(osinstance.spec.sysconfigs.as_ref()); if let Some(osinstance_status) = osinstance.status.as_ref() { @@ -105,7 +105,7 @@ impl ConfigType { status_config_version = get_config_version(None); } configs = osinstance.spec.sysconfigs.clone(); - }, + } } debug!( "=======osinstance soec config version is {},status config version is {}", @@ -127,7 +127,7 @@ impl ConfigType { sysconfigs: None, }) } - }, + } ConfigType::SysConfig => { if let Some(osi_status) = &mut osinstance.status { osi_status.sysconfigs = osinstance.spec.sysconfigs.clone(); @@ -135,7 +135,7 @@ impl ConfigType { osinstance.status = Some(OSInstanceStatus { upgradeconfigs: None, sysconfigs: osinstance.spec.sysconfigs.clone() }) } - }, + } } } } diff --git a/KubeOS-Rust/proxy/src/drain.rs b/KubeOS-Rust/proxy/src/drain.rs index 09cf662..72836f9 100644 --- a/KubeOS-Rust/proxy/src/drain.rs +++ b/KubeOS-Rust/proxy/src/drain.rs @@ -66,7 +66,7 @@ async fn get_pods_deleted( Ok(pods @ ObjectList { .. }) => pods, Err(err) => { return Err(GetPodListsError { source: err, node_name: node_name.to_string() }); - }, + } }; let mut filterd_pods_list: Vec = Vec::new(); let mut filterd_err: Vec = Vec::new(); @@ -189,14 +189,14 @@ async fn wait_for_deletion(k8s_client: &kube::Client, pod: &Pod) -> Result<(), e let name = (&p).name_any(); info!("Pod {} deleted.", name); break; - }, + } Ok(_) => { info!("Pod '{}' is not yet deleted. Waiting {}s.", pod.name_any(), EVERY_DELETION_CHECK.as_secs_f64()); - }, + } Err(kube::Error::Api(e)) if e.code == response_error_not_found => { info!("Pod {} is deleted.", pod.name_any()); break; - }, + } Err(e) => { error!( "Get pod {} reported error: '{}', whether pod is deleted cannot be determined, waiting {}s.", @@ -204,7 +204,7 @@ async fn wait_for_deletion(k8s_client: &kube::Client, pod: &Pod) -> Result<(), e e, EVERY_DELETION_CHECK.as_secs_f64() ); - }, + } } if start_time.elapsed() > TIMEOUT { return Err(WaitDeletionError { pod_name: pod.name_any(), max_wait: TIMEOUT }); @@ -241,7 +241,7 @@ impl PodFilter for FinishedOrFailedFilter { return match pod.status.as_ref() { Some(PodStatus { phase: Some(phase), .. }) if phase == "Failed" || phase == "Succeeded" => { FilterResult::create_filter_result(true, "", PodDeleteStatus::Okay) - }, + } _ => FilterResult::create_filter_result(false, "", PodDeleteStatus::Okay), }; } @@ -269,7 +269,7 @@ impl PodFilter for DaemonFilter { let description = format!("Cannot drain Pod '{}': Pod is member of a DaemonSet", pod.name_any()); Box::new(FilterResult { result: false, desc: description, status: PodDeleteStatus::Error }) } - }, + } _ => FilterResult::create_filter_result(true, "", PodDeleteStatus::Okay), }; } @@ -287,7 +287,7 @@ impl PodFilter for MirrorFilter { Some(annotations) if annotations.contains_key("kubernetes.io/config.mirror") => { let description = format!("Ignore Pod '{}': Pod is a static Mirror Pod", pod.name_any()); FilterResult::create_filter_result(false, &description.to_string(), PodDeleteStatus::Warning) - }, + } _ => FilterResult::create_filter_result(true, "", PodDeleteStatus::Okay), }; } @@ -312,7 +312,7 @@ impl PodFilter for LocalStorageFilter { let description = format!("Cannot drain Pod '{}': Pod has local Storage", pod.name_any()); Box::new(FilterResult { result: false, desc: description, status: PodDeleteStatus::Error }) } - }, + } _ => FilterResult::create_filter_result(true, "", PodDeleteStatus::Okay), }; } @@ -365,7 +365,7 @@ impl PodFilter for DeletedFilter { && now - Duration::from_secs(time.0.timestamp() as u64) >= self.delete_wait_timeout => { FilterResult::create_filter_result(true, "", PodDeleteStatus::Okay) - }, + } _ => FilterResult::create_filter_result(true, "", PodDeleteStatus::Okay), }; } @@ -471,7 +471,7 @@ impl ErrorHandleStrategy { return match self { Self::TolerateStrategy => { return backoff.take(0); - }, + } Self::RetryStrategy => backoff.take(MAX_RETRIES_TIMES), }; @@ -488,7 +488,7 @@ impl tokio_retry::Condition for ErrorHandleStrategy { } else { false } - }, + } } } } diff --git a/KubeOS-Rust/proxy/src/main.rs b/KubeOS-Rust/proxy/src/main.rs index cd601d0..ad36b64 100644 --- a/KubeOS-Rust/proxy/src/main.rs +++ b/KubeOS-Rust/proxy/src/main.rs @@ -20,7 +20,9 @@ use kube::{ }; use log::{error, info}; mod controller; -use controller::{error_policy, reconcile, AgentClient, ControllerClient, ProxyController, OS, SOCK_PATH}; +use controller::{ + error_policy, reconcile, AgentCallClient, AgentClient, ControllerClient, ProxyController, OS, SOCK_PATH, +}; const PROXY_VERSION: Option<&'static str> = option_env!("CARGO_PKG_VERSION"); #[tokio::main] @@ -29,14 +31,15 @@ async fn main() -> Result<()> { let client = Client::try_default().await?; let os: Api = Api::all(client.clone()); let controller_client = ControllerClient::new(client.clone()); - let agent_client = AgentClient::new(SOCK_PATH); + let agent_call_client = AgentCallClient::default(); + let agent_client = AgentClient::new(SOCK_PATH, agent_call_client); let proxy_controller = ProxyController::new(client, controller_client, agent_client); info!("os-proxy version is {}, start renconcile", PROXY_VERSION.unwrap_or("Not Found")); Controller::new(os, ListParams::default()) .run(reconcile, error_policy, Context::new(proxy_controller)) .for_each(|res| async move { match res { - Ok(_o) => {}, + Ok(_o) => {} Err(e) => error!("reconcile failed: {}", e.to_string()), } }) -- 2.34.1