From a80ad5b0aef20f9f5d6e5f06450311c39075a16e Mon Sep 17 00:00:00 2001 From: huyubiao Date: Thu, 25 May 2023 08:22:31 +0800 Subject: [PATCH] fix: inconsistent data in app memory and db after daemon-reload or daemon-reexec --- coms/mount/src/rentry.rs | 16 ++++++-- coms/service/src/mng.rs | 1 + coms/service/src/rentry.rs | 16 ++++++-- coms/socket/src/mng.rs | 11 ++--- coms/socket/src/rentry.rs | 24 ++++++++--- coms/target/src/rentry.rs | 8 +++- core/bin/job/entry.rs | 4 +- core/bin/job/manager.rs | 9 +++++ core/bin/job/table.rs | 30 ++++++++++++++ core/bin/unit/datastore/child.rs | 9 +++++ core/bin/unit/datastore/deps.rs | 5 +++ core/bin/unit/datastore/mod.rs | 8 ++++ core/bin/unit/entry/uentry.rs | 11 +++++ core/bin/unit/entry/unitx.rs | 4 ++ core/bin/unit/manager.rs | 28 +++++++++++++ core/bin/unit/runtime.rs | 11 +++++ core/lib/rel/api.rs | 30 ++++++++++---- core/lib/rel/base.rs | 69 +++++++++++++++++++++++--------- core/lib/rel/history.rs | 23 +++++++++-- core/lib/rel/station.rs | 12 ++++++ 20 files changed, 277 insertions(+), 52 deletions(-) diff --git a/coms/mount/src/rentry.rs b/coms/mount/src/rentry.rs index 814ad21..45c0ce0 100644 --- a/coms/mount/src/rentry.rs +++ b/coms/mount/src/rentry.rs @@ -103,12 +103,16 @@ impl ReDbTable for MountReDb { self.0.cache_2_db(db_wtxn); } + fn reexport(&self, db_wtxn: &mut ReDbRwTxn) { + self.0.rebuf_2_db(db_wtxn); + } + fn import<'a>(&self, db_rtxn: &ReDbRoTxn) { self.0.db_2_cache(db_rtxn); } - fn ignore_set(&self, ignore: bool) { - self.0.set_ignore(ignore); + fn switch_set(&self, switch: bool) { + self.0.set_switch(switch); } } @@ -121,11 +125,15 @@ impl ReDbTable for MountReDb { self.0.cache_2_db(db_wtxn); } + fn reexport(&self, db_wtxn: &mut ReDbRwTxn) { + self.0.rebuf_2_db(db_wtxn); + } + fn import<'a>(&self, db_rtxn: &ReDbRoTxn) { self.0.db_2_cache(db_rtxn); } - fn ignore_set(&self, ignore: bool) { - self.0.set_ignore(ignore); + fn switch_set(&self, switch: bool) { + self.0.set_switch(switch); } } diff --git a/coms/service/src/mng.rs b/coms/service/src/mng.rs index d73fa34..0f5db5f 100644 --- a/coms/service/src/mng.rs +++ b/coms/service/src/mng.rs @@ -130,6 +130,7 @@ impl ReStation for ServiceMng { } fn entry_clear(&self) { + // pid_file is a transient file that can be directly closed self.unwatch_pid_file(); self.stop_watchdog(); diff --git a/coms/service/src/rentry.rs b/coms/service/src/rentry.rs index df8a255..175828e 100644 --- a/coms/service/src/rentry.rs +++ b/coms/service/src/rentry.rs @@ -598,12 +598,16 @@ impl ReDbTable for ServiceReDb { self.0.cache_2_db(db_wtxn); } + fn reexport(&self, db_wtxn: &mut ReDbRwTxn) { + self.0.rebuf_2_db(db_wtxn); + } + fn import<'a>(&self, db_rtxn: &ReDbRoTxn) { self.0.db_2_cache(db_rtxn); } - fn ignore_set(&self, ignore: bool) { - self.0.set_ignore(ignore); + fn switch_set(&self, switch: bool) { + self.0.set_switch(switch); } } @@ -616,11 +620,15 @@ impl ReDbTable for ServiceReDb { self.0.cache_2_db(db_wtxn); } + fn reexport(&self, db_wtxn: &mut ReDbRwTxn) { + self.0.rebuf_2_db(db_wtxn); + } + fn import<'a>(&self, db_rtxn: &ReDbRoTxn) { self.0.db_2_cache(db_rtxn); } - fn ignore_set(&self, ignore: bool) { - self.0.set_ignore(ignore); + fn switch_set(&self, switch: bool) { + self.0.set_switch(switch); } } diff --git a/coms/socket/src/mng.rs b/coms/socket/src/mng.rs index e079acd..e7a6a01 100644 --- a/coms/socket/src/mng.rs +++ b/coms/socket/src/mng.rs @@ -109,13 +109,14 @@ impl ReStation for SocketMng { // reload: entry-only fn entry_coldplug(&self) { - if self.state() != SocketState::Listening { + if self.state() == SocketState::Listening { self.watch_fds(); } } fn entry_clear(&self) { - self.close_fds(); + // port fd is a long-term monitoring file and cannot be closed + self.unwatch_fds(); } } @@ -690,11 +691,11 @@ impl SocketMng { } fn map_ports_fd(&self, rports: Vec<(PortType, String, RawFd)>) { - assert_eq!(rports.len(), self.ports().len()); - for (p_type, listen, fd) in rports.iter() { match self.ports_find(*p_type, listen) { - Some(port) => port.set_fd(self.comm.reli().fd_take(*fd)), + Some(port) => { + port.set_fd(self.comm.reli().fd_take(*fd)); + } None => log::debug!("Not find {:?}:{:?}", *p_type, listen), } } diff --git a/coms/socket/src/rentry.rs b/coms/socket/src/rentry.rs index 1794e69..2409c84 100644 --- a/coms/socket/src/rentry.rs +++ b/coms/socket/src/rentry.rs @@ -330,12 +330,16 @@ impl ReDbTable for SocketReDb { self.0.cache_2_db(db_wtxn); } + fn reexport(&self, db_wtxn: &mut ReDbRwTxn) { + self.0.rebuf_2_db(db_wtxn); + } + fn import<'a>(&self, db_rtxn: &ReDbRoTxn) { self.0.db_2_cache(db_rtxn); } - fn ignore_set(&self, ignore: bool) { - self.0.set_ignore(ignore); + fn switch_set(&self, switch: bool) { + self.0.set_switch(switch); } } @@ -348,12 +352,16 @@ impl ReDbTable for SocketReDb { self.0.cache_2_db(db_wtxn); } + fn reexport(&self, db_wtxn: &mut ReDbRwTxn) { + self.0.rebuf_2_db(db_wtxn); + } + fn import<'a>(&self, db_rtxn: &ReDbRoTxn) { self.0.db_2_cache(db_rtxn); } - fn ignore_set(&self, ignore: bool) { - self.0.set_ignore(ignore); + fn switch_set(&self, switch: bool) { + self.0.set_switch(switch); } } @@ -366,11 +374,15 @@ impl ReDbTable for SocketReDb { self.0.cache_2_db(db_wtxn); } + fn reexport(&self, db_wtxn: &mut ReDbRwTxn) { + self.0.rebuf_2_db(db_wtxn); + } + fn import<'a>(&self, db_rtxn: &ReDbRoTxn) { self.0.db_2_cache(db_rtxn); } - fn ignore_set(&self, ignore: bool) { - self.0.set_ignore(ignore); + fn switch_set(&self, switch: bool) { + self.0.set_switch(switch); } } diff --git a/coms/target/src/rentry.rs b/coms/target/src/rentry.rs index 8f0ce69..9d3f90c 100644 --- a/coms/target/src/rentry.rs +++ b/coms/target/src/rentry.rs @@ -77,11 +77,15 @@ impl ReDbTable for TargetReDb { self.0.cache_2_db(db_wtxn); } + fn reexport(&self, db_wtxn: &mut ReDbRwTxn) { + self.0.rebuf_2_db(db_wtxn); + } + fn import<'a>(&self, db_rtxn: &ReDbRoTxn) { self.0.db_2_cache(db_rtxn); } - fn ignore_set(&self, ignore: bool) { - self.0.set_ignore(ignore); + fn switch_set(&self, switch: bool) { + self.0.set_switch(switch); } } diff --git a/core/bin/job/entry.rs b/core/bin/job/entry.rs index 55a0015..54af665 100755 --- a/core/bin/job/entry.rs +++ b/core/bin/job/entry.rs @@ -472,7 +472,7 @@ impl Job { last_rkind != rkind } - fn rentry_trigger_insert(&self) { + pub(super) fn rentry_trigger_insert(&self) { self.rentry .trigger_insert(self.unit.id(), self.kind, &self.attr.borrow()); } @@ -485,7 +485,7 @@ impl Job { self.rentry.trigger_get(self.unit.id()) } - fn rentry_suspends_insert(&self) { + pub(super) fn rentry_suspends_insert(&self) { self.rentry .suspends_insert(self.unit.id(), self.kind, &self.attr.borrow()); } diff --git a/core/bin/job/manager.rs b/core/bin/job/manager.rs index f5a247d..e8eee8b 100755 --- a/core/bin/job/manager.rs +++ b/core/bin/job/manager.rs @@ -108,6 +108,10 @@ impl ReStation for JobManager { self.data.db_map(); } + fn db_insert(&self) { + self.data.db_insert(); + } + // reload: special entry_coldplug // repeating protection fn entry_clear(&self) { @@ -386,6 +390,11 @@ impl JobManagerData { self.stat.clear_cnt(); // no history } + pub(self) fn db_insert(&self) { + self.jobs.rentry_insert_trigger(); + self.jobs.rentry_insert_suspend(); + } + pub(self) fn coldplug_unit(&self, unit: &UnitX) { // trigger self.jobs.coldplug_trigger(unit); diff --git a/core/bin/job/table.rs b/core/bin/job/table.rs index 91ac45b..7b77b1a 100644 --- a/core/bin/job/table.rs +++ b/core/bin/job/table.rs @@ -60,6 +60,12 @@ impl JobTable { job } + pub(super) fn rentry_insert_suspend(&self) { + for job in self.t_unit.borrow().get_all_suspends() { + job.rentry_suspends_insert(); + } + } + pub(super) fn rentry_map_trigger(&self, ja: &JobAlloc, config: &JobConf) -> Rc { let job = ja.alloc(config); job.rentry_map_trigger(); @@ -67,6 +73,12 @@ impl JobTable { job } + pub(super) fn rentry_insert_trigger(&self) { + for job in self.t_unit.borrow().get_all_triggers() { + job.rentry_trigger_insert(); + } + } + pub(super) fn coldplug_suspend(&self, unit: &UnitX) { for job in self.t_unit.borrow().get_suspends(unit).iter() { job.coldplug_suspend(); @@ -712,6 +724,24 @@ impl JobUnitTable { jobs } + pub(self) fn get_all_suspends(&self) -> Vec> { + let mut jobs = Vec::new(); + for (_, uv) in self.t_data.iter() { + jobs.append(&mut uv.get_suspends()); + } + jobs + } + + pub(self) fn get_all_triggers(&self) -> Vec> { + let mut jobs = Vec::new(); + for (_, uv) in self.t_data.iter() { + if let Some(job) = uv.get_trigger() { + jobs.push(job); + } + } + jobs + } + pub(self) fn get_trigger_info(&self, unit: &UnitX) -> Option<(Rc, bool)> { if let Some(uv) = self.t_data.get(unit) { uv.get_trigger() diff --git a/core/bin/unit/datastore/child.rs b/core/bin/unit/datastore/child.rs index af63792..60c73c5 100644 --- a/core/bin/unit/datastore/child.rs +++ b/core/bin/unit/datastore/child.rs @@ -37,6 +37,10 @@ impl ReStation for UnitChild { self.data.db_map(&self.units); } + fn db_insert(&self) { + self.data.db_insert(); + } + // reload fn entry_clear(&self) { self.data.entry_clear(); @@ -132,6 +136,11 @@ impl UnitChildData { } } + pub(self) fn db_insert(&self) { + // [key:pid, data:unit[s]] and [key:unit, data:pid[s]](RELI_DB_HUNIT_CHILD) are equivalent + // So here [key:unit, data:pids] is reused and [key:pid, data:unit[s]] is not stored in the database. + } + pub(self) fn add_watch_pid(&self, unit: Rc, pid: Pid) { let mut watch_pids = self.watch_pids.borrow_mut(); watch_pids.insert(pid, unit); diff --git a/core/bin/unit/datastore/deps.rs b/core/bin/unit/datastore/deps.rs index cd38d45..80836ef 100644 --- a/core/bin/unit/datastore/deps.rs +++ b/core/bin/unit/datastore/deps.rs @@ -39,6 +39,11 @@ impl ReStation for UnitDep { self.sub.data.borrow_mut().db_map(); } + fn db_insert(&self) { + // If the data changes under db_map() when reload is true, it needs to be inserted. + // db_map currently does nothing to do. + } + // reload fn entry_clear(&self) { self.sub.data.borrow_mut().clear(); diff --git a/core/bin/unit/datastore/mod.rs b/core/bin/unit/datastore/mod.rs index 1dc09d7..2a5f31b 100644 --- a/core/bin/unit/datastore/mod.rs +++ b/core/bin/unit/datastore/mod.rs @@ -70,6 +70,14 @@ impl UnitDb { self.child.db_map(reload); } + pub(super) fn db_insert_excl_units(&self) { + // dep + self.dep.db_insert(); + + // child + self.child.db_insert(); + } + pub fn units_insert(&self, name: String, unit: Rc) -> Option> { self.units.insert(name, unit) } diff --git a/core/bin/unit/entry/uentry.rs b/core/bin/unit/entry/uentry.rs index fd05746..a22ad10 100644 --- a/core/bin/unit/entry/uentry.rs +++ b/core/bin/unit/entry/uentry.rs @@ -94,6 +94,17 @@ impl ReStation for Unit { self.sub.db_map(reload); } + // data insert + fn db_insert(&self) { + self.base.db_insert(); + self.config.db_insert(); + self.cgroup.db_insert(); + self.load.db_insert(); + self.child.db_insert(); + + self.sub.db_insert(); + } + // reload: entry-only fn entry_coldplug(&self) { // rebuild external connections, like: timer, ... diff --git a/core/bin/unit/entry/unitx.rs b/core/bin/unit/entry/unitx.rs index 9ffd549..d9878f4 100644 --- a/core/bin/unit/entry/unitx.rs +++ b/core/bin/unit/entry/unitx.rs @@ -38,6 +38,10 @@ impl ReStation for UnitX { self.0.db_map(reload); } + fn db_insert(&self) { + self.0.db_insert(); + } + // reload: entry-only fn entry_coldplug(&self) { self.0.entry_coldplug(); diff --git a/core/bin/unit/manager.rs b/core/bin/unit/manager.rs index bee3093..6e83240 100644 --- a/core/bin/unit/manager.rs +++ b/core/bin/unit/manager.rs @@ -1198,6 +1198,28 @@ impl ReStation for UnitManager { self.sms.db_map(reload); } + // data + fn db_insert(&self) { + for unit in self.db.units_get_all(None).iter() { + unit.db_insert(); + } + + /* others: unit-dep and unit-child */ + self.db.db_insert_excl_units(); + + // rt + self.rt.db_insert(); + + // job + self.jm.db_insert(); + + // notify + self.notify.db_insert(); + + // sub-manager + self.sms.db_insert(); + } + // reload fn register_ex(&self) { // notify @@ -1290,6 +1312,12 @@ mod unit_submanager { } } + pub(super) fn db_insert(&self) { + for (_, sub) in self.db.borrow().iter() { + sub.db_insert(); + } + } + pub(super) fn db_compensate_last( &self, lframe: (u32, Option, Option), diff --git a/core/bin/unit/runtime.rs b/core/bin/unit/runtime.rs index 89e62c5..215a5a7 100644 --- a/core/bin/unit/runtime.rs +++ b/core/bin/unit/runtime.rs @@ -47,6 +47,10 @@ impl ReStation for UnitRT { self.data.db_map(reload); } + fn db_insert(&self) { + self.data.db_insert(); + } + // reload // repeating protection fn entry_clear(&self) { @@ -239,6 +243,13 @@ impl UnitRTData { } } + pub(self) fn db_insert(&self) { + // If the data changes under db_map() when reload is true, it needs to be inserted. + // db_map currently does nothing to do. + // QUEUE_LOAD is nothing to do. + // QUEUE_TARGET_DEPS is nothing to do. + } + pub(self) fn dispatch_load_queue(&self) { log::trace!("dispatch load queue"); diff --git a/core/lib/rel/api.rs b/core/lib/rel/api.rs index cabf24e..60a3543 100644 --- a/core/lib/rel/api.rs +++ b/core/lib/rel/api.rs @@ -151,6 +151,13 @@ impl Reliability { self.input_rebuild(); self.db_compensate(); self.db_map(reload); + if reload { + // If daemon-reload or daemon-reexec, we need to update all changes, clear db, and submit all changes to db. + self.db_insert(); + self.history.reflush(); + // Due to changes in db, we need to reload the data from db to cache. + self.history.import(); + } self.make_consistent(); // restore last's ignore @@ -263,12 +270,12 @@ impl Reliability { fn input_rebuild(&self) { // ignore history's input - self.history.ignore_set(true); + self.history.switch_set(true); self.station.input_rebuild(); // restore history's ignore - self.history.ignore_set(false); + self.history.switch_set(false); } fn db_compensate(&self) { @@ -282,15 +289,24 @@ impl Reliability { } /// map data from database - /// If reload is true, determine whether the configuration needs to be reloaded based on the situation. + /// reload determine whether the configuration needs to be reloaded based on the situation. fn db_map(&self, reload: bool) { - // ignore history's input - self.history.ignore_set(true); + // control use buf + self.history.switch_set(true); self.station.db_map(reload); - // restore history's ignore - self.history.ignore_set(false); + // control use cache + self.history.switch_set(false); + } + + /// map-data insert to buf, switch is true indicating either daemon-reload or daemon-reexec + fn db_insert(&self) { + self.history.switch_set(true); + + self.station.db_insert(); + + self.history.switch_set(false); } fn make_consistent(&self) { diff --git a/core/lib/rel/base.rs b/core/lib/rel/base.rs index 782d54c..274b504 100644 --- a/core/lib/rel/base.rs +++ b/core/lib/rel/base.rs @@ -28,7 +28,7 @@ use std::{env, fs}; /// K & V that can be deserialized without borrowing any data from the deserializer. pub struct ReDb { // control - ignore: RefCell, + switch: RefCell, // data db: Database, SerdeBincode>, @@ -36,7 +36,8 @@ pub struct ReDb { add: RefCell>, del: RefCell>, name: String, - //_phantom: PhantomData<&'a K>, + buf: RefCell>, // daemon-reload or daemon-reexec will temporarily store the data here first, and finally refreshes it to db. + //_phantom: PhantomData<&'a K>, } impl ReDbTable for ReDb @@ -52,12 +53,17 @@ where self.cache_2_db(db_wtxn); } + /// daemon-reload or daemon-reexec export all data to database + fn reexport(&self, db_wtxn: &mut ReDbRwTxn) { + self.rebuf_2_db(db_wtxn); + } + fn import(&self, db_rtxn: &ReDbRoTxn) { self.db_2_cache(db_rtxn); } - fn ignore_set(&self, ignore: bool) { - self.set_ignore(ignore); + fn switch_set(&self, switch: bool) { + self.set_switch(switch); } } @@ -70,12 +76,13 @@ where pub fn new(relir: &Reliability, db_name: &str) -> ReDb { let db = relir.create_database(Some(db_name)).unwrap(); ReDb { - ignore: RefCell::new(false), + switch: RefCell::new(false), db, cache: RefCell::new(HashMap::new()), add: RefCell::new(HashMap::new()), del: RefCell::new(HashSet::new()), name: String::from(db_name), + buf: RefCell::new(HashMap::new()), //_phantom: PhantomData, } } @@ -89,19 +96,26 @@ where } /// set the ignore flag of data - pub fn set_ignore(&self, ignore: bool) { - *self.ignore.borrow_mut() = ignore; + pub fn set_switch(&self, switch: bool) { + *self.switch.borrow_mut() = switch; } /// insert a entry pub fn insert(&self, k: K, v: V) { - if self.ignore() { + let switch = self.switch(); + log::debug!( + "ReDb[{}] switch:{:?} insert, key:{:?}, value:{:?}.", + &self.name, + switch, + &k, + &v + ); + + if switch { + self.buf.borrow_mut().insert(k, v); return; } - let n = &self.name; - log::debug!("ReDb[{}] insert, key: {:?}, value: {:?}.", n, &k, &v); - // remove "del" + insert "add" self.del.borrow_mut().remove(&k); self.add.borrow_mut().insert(k.clone(), v.clone()); @@ -112,12 +126,15 @@ where /// remove a entry pub fn remove(&self, k: &K) { - if self.ignore() { + let n = &self.name; + let switch = self.switch(); + log::debug!("ReDb[{}] switch:{:?}, remove, key:{:?}.", n, switch, &k); + + if switch { + self.buf.borrow_mut().remove(k); return; } - log::debug!("ReDb[{}] remove, key: {:?}.", &self.name, &k); - // remove "add" + insert "del" self.add.borrow_mut().remove(k); self.del.borrow_mut().insert(k.clone()); @@ -136,6 +153,10 @@ where /// get the existence of the key pub fn contains_key(&self, k: &K) -> bool { + if self.switch() { + return self.buf.borrow().contains_key(k); + } + self.cache.borrow().contains_key(k) } @@ -178,6 +199,16 @@ where self.del.borrow_mut().clear(); } + /// export all data from cache to database + pub fn rebuf_2_db(&self, wtxn: &mut ReDbRwTxn) { + // "buf" -> db.put + clear "buf" + for (k, v) in self.buf.borrow().iter() { + self.db.put(&mut wtxn.0, k, v).expect("history.put"); + } + + self.buf.borrow_mut().clear(); + } + /// emport all data from database to cache pub fn db_2_cache(&self, rtxn: &ReDbRoTxn) where @@ -197,8 +228,8 @@ where } } - fn ignore(&self) -> bool { - *self.ignore.borrow() + fn switch(&self) -> bool { + *self.switch.borrow() } } @@ -228,10 +259,12 @@ pub trait ReDbTable { fn clear(&self, wtxn: &mut ReDbRwTxn); /// export all data to database fn export(&self, wtxn: &mut ReDbRwTxn); + /// daemon-reload or daemon-reexec export all data to database + fn reexport(&self, wtxn: &mut ReDbRwTxn); /// import all data from database fn import(&self, rtxn: &ReDbRoTxn); - /// set the ignore flag of data - fn ignore_set(&self, ignore: bool); + /// set the switch flag of data, does switch control whether to use cache or buf + fn switch_set(&self, switch: bool); } const RELI_PATH_DIR: &str = "/run/sysmaster/reliability"; diff --git a/core/lib/rel/history.rs b/core/lib/rel/history.rs index 2e2e03b..846f8c5 100644 --- a/core/lib/rel/history.rs +++ b/core/lib/rel/history.rs @@ -84,6 +84,21 @@ impl ReliHistory { db_wtxn.0.commit().expect("history.commit"); } + /// daemon-reload or daemon-reexec clear db and data reflush to db + pub fn reflush(&self) { + // create transaction + let mut db_wtxn = ReDbRwTxn::new(&self.env).expect("history.write_txn"); + + // flush to db + for (_, db) in self.dbs.borrow().iter() { + db.clear(&mut db_wtxn); + db.reexport(&mut db_wtxn); + } + + // commit + db_wtxn.0.commit().expect("history.commit"); + } + pub fn import(&self) { let db_rtxn = ReDbRoTxn::new(&self.env).expect("history.write_txn"); @@ -93,11 +108,11 @@ impl ReliHistory { } } - pub fn ignore_set(&self, ignore: bool) { - // set ignore - *self.ignore.borrow_mut() = ignore; + pub fn switch_set(&self, switch: bool) { + // set switch + *self.ignore.borrow_mut() = switch; for (_, db) in self.dbs.borrow().iter() { - db.ignore_set(ignore); + db.switch_set(switch); } } diff --git a/core/lib/rel/station.rs b/core/lib/rel/station.rs index 10e1bed..b06be4a 100644 --- a/core/lib/rel/station.rs +++ b/core/lib/rel/station.rs @@ -90,6 +90,18 @@ impl ReliStation { } } + pub fn db_insert(&self) { + // level 1 + for station in self.get_kind(ReStationKind::Level1).iter() { + station.db_insert(); + } + + // level 2 + for station in self.get_kind(ReStationKind::Level2).iter() { + station.db_insert(); + } + } + pub fn make_consistent( &self, lframe: Option<(u32, Option, Option)>, -- 2.33.0