oeAware-manager/0001-fix-remove-plugin-bug-and-refactor.patch
fly_1997 9b1ab30deb fix spec file and some bugs
(cherry picked from commit ca6063d6790f27ef39fb17c5ec7e917c99d83dce)
2024-04-29 20:28:48 +08:00

1081 lines
40 KiB
Diff

From 47fb3c3a0097675ebc9ee2cdc20806638cb57b4d Mon Sep 17 00:00:00 2001
From: fly_1997 <flylove7@outlook.com>
Date: Thu, 25 Apr 2024 10:31:05 +0800
Subject: [PATCH 1/4] fix remove plugin bug and refactor
---
src/client/client.cpp | 33 +++++----
src/client/client.h | 8 +--
src/client/cmd_handler.h | 3 +-
src/plugin_mgr/dep_handler.cpp | 52 ++++++--------
src/plugin_mgr/dep_handler.h | 15 ++--
src/plugin_mgr/instance_run_handler.cpp | 77 +++++++++++----------
src/plugin_mgr/instance_run_handler.h | 45 +++++-------
src/plugin_mgr/memory_store.h | 67 ++++++++++++++++++
src/plugin_mgr/message_manager.cpp | 2 +-
src/plugin_mgr/message_manager.h | 9 ++-
src/plugin_mgr/plugin.cpp | 24 +++----
src/plugin_mgr/plugin.h | 21 ++----
src/plugin_mgr/plugin_manager.cpp | 92 ++++++++++++-------------
src/plugin_mgr/plugin_manager.h | 21 +++---
14 files changed, 253 insertions(+), 216 deletions(-)
create mode 100644 src/plugin_mgr/memory_store.h
diff --git a/src/client/client.cpp b/src/client/client.cpp
index 2377c4b..72fb6a2 100644
--- a/src/client/client.cpp
+++ b/src/client/client.cpp
@@ -11,6 +11,21 @@
******************************************************************************/
#include "client.h"
+const std::string Client::OPT_STRING = "Qqd:t:l:r:e:";
+const struct option Client::long_options[] = {
+ {"help", no_argument, NULL, 'h'},
+ {"load", required_argument, NULL, 'l'},
+ {"type", required_argument, NULL, 't'},
+ {"remove", required_argument, NULL, 'r'},
+ {"query", required_argument, NULL, 'q'},
+ {"query-dep", required_argument, NULL, 'Q'},
+ {"enable", required_argument, NULL, 'e'},
+ {"disable", required_argument, NULL, 'd'},
+ {"list", no_argument, NULL, 'L'},
+ {"download", required_argument, NULL, 'D'},
+ {0, 0, 0, 0},
+};
+
static void print_error(const Msg &msg) {
for (int i = 0; i < msg.payload_size(); ++i) {
printf("%s\n", msg.payload(i).c_str());
@@ -40,20 +55,6 @@ void Client::run_cmd(int cmd) {
this->cmd_handler->res_handler(res);
}
-const std::string Client::OPT_STRING = "Qqd:t:l:r:e:";
-const struct option Client::long_options[] = {
- {"help", no_argument, NULL, 'h'},
- {"load", required_argument, NULL, 'l'},
- {"type", required_argument, NULL, 't'},
- {"remove", required_argument, NULL, 'r'},
- {"query", required_argument, NULL, 'q'},
- {"query-dep", required_argument, NULL, 'Q'},
- {"enable", required_argument, NULL, 'e'},
- {"disable", required_argument, NULL, 'd'},
- {"list", no_argument, NULL, 'L'},
- {"download", required_argument, NULL, 'D'},
- {0, 0, 0, 0},
-};
int Client::arg_parse(int argc, char *argv[]) {
int cmd = -1;
int opt;
@@ -95,10 +96,12 @@ int Client::arg_parse(int argc, char *argv[]) {
}
return cmd;
}
+
void Client::arg_error(const std::string &msg) {
std::cerr << "oeawarectl: " << msg << "\n";
exit(EXIT_FAILURE);
}
+
void Client::close() {
tcp_socket.clear();
-}
\ No newline at end of file
+}
diff --git a/src/client/client.h b/src/client/client.h
index 0c2b053..9528f13 100644
--- a/src/client/client.h
+++ b/src/client/client.h
@@ -31,9 +31,9 @@ private:
TcpSocket tcp_socket;
CmdHandler *cmd_handler;
- const static std::string OPT_STRING;
- const static int MAX_OPT_SIZE = 11;
- const static struct option long_options[MAX_OPT_SIZE];
+ static const std::string OPT_STRING;
+ static const int MAX_OPT_SIZE = 20;
+ static const struct option long_options[MAX_OPT_SIZE];
};
-#endif // !CLIENT_H
\ No newline at end of file
+#endif // !CLIENT_H
diff --git a/src/client/cmd_handler.h b/src/client/cmd_handler.h
index 0056430..4afae4c 100644
--- a/src/client/cmd_handler.h
+++ b/src/client/cmd_handler.h
@@ -21,7 +21,6 @@ static std::string arg;
class CmdHandler {
public:
- CmdHandler(){}
virtual void handler(Msg &msg) = 0;
virtual void res_handler(Msg &msg) = 0;
};
@@ -80,4 +79,4 @@ void set_type(char* _type);
void set_arg(char* _arg);
void print_help();
-#endif // !CLIENT_CMD_HANDLER_H
\ No newline at end of file
+#endif // !CLIENT_CMD_HANDLER_H
diff --git a/src/plugin_mgr/dep_handler.cpp b/src/plugin_mgr/dep_handler.cpp
index f2e9ba3..9a6ddc5 100644
--- a/src/plugin_mgr/dep_handler.cpp
+++ b/src/plugin_mgr/dep_handler.cpp
@@ -14,28 +14,22 @@
#include <stdio.h>
void DepHandler::add_arc_node(Node* node, const std::vector<std::string> &dep_nodes) {
- ArcNode *arc_head = new ArcNode();
- node->head = arc_head;
+ ArcNode *arc_head = node->head;
node->cnt = dep_nodes.size();
int real_cnt = 0;
bool state = true;
- for (auto val : dep_nodes) {
+ for (auto name : dep_nodes) {
ArcNode *tmp = new ArcNode();
- tmp->val = val;
- tmp->node_name = node->val;
- tmp->next = arc_head->next;
- if (arc_head->next != nullptr) {
- arc_head->next->pre = tmp;
- }
-
- tmp->pre = arc_head;
+ tmp->arc_name = name;
+ tmp->node_name = node->name;
+ tmp->next = arc_head->next;
arc_head->next = tmp;
- if (nodes.count(val)) {
- arc_nodes[val][tmp] = true;
+ if (nodes.count(name)) {
+ arc_nodes[name][tmp] = true;
real_cnt++;
} else {
- arc_nodes[val][tmp] = false;
+ arc_nodes[name][tmp] = false;
state = false;
}
}
@@ -46,15 +40,14 @@ void DepHandler::add_arc_node(Node* node, const std::vector<std::string> &dep_no
void DepHandler::add_node(std::string name, std::vector<std::string> dep_nodes) {
Node *cur_node = add_new_node(name);
- change_arc_nodes(name, true);
- add_arc_node(cur_node, dep_nodes);
this->nodes[name] = cur_node;
+ add_arc_node(cur_node, dep_nodes);
+ change_arc_nodes(name, true);
}
void DepHandler::del_node(std::string name) {
del_node_and_arc_nodes(get_node(name));
this->nodes.erase(name);
- change_arc_nodes(name, false);
}
@@ -65,8 +58,9 @@ Node* DepHandler::get_node(std::string name) {
Node* DepHandler::add_new_node(std::string name) {
Node *cur_node = new Node(name);
+ cur_node->head = new ArcNode();
+
tail->next = cur_node;
- cur_node->pre = tail;
tail = cur_node;
return cur_node;
}
@@ -74,18 +68,15 @@ Node* DepHandler::add_new_node(std::string name) {
void DepHandler::del_node_and_arc_nodes(Node *node) {
- Node *pre = node->pre;
Node *next = node->next;
- pre->next = next;
- if (next != nullptr)
- next->pre = pre;
ArcNode *arc = node->head;
while(arc) {
ArcNode *tmp = arc->next;
if (arc != node->head){
- arc_nodes[node->val].erase(arc);
- if (arc_nodes[node->val].empty()) {
- arc_nodes.erase(node->val);
+ std::string name = arc->arc_name;
+ arc_nodes[name].erase(arc);
+ if (arc_nodes[name].empty()) {
+ arc_nodes.erase(name);
}
}
delete arc;
@@ -95,6 +86,7 @@ void DepHandler::del_node_and_arc_nodes(Node *node) {
delete node;
}
void DepHandler::change_arc_nodes(std::string name, bool state) {
+ if (!nodes[name]->state || !arc_nodes.count(name)) return;
std::unordered_map<ArcNode*, bool> &mp = arc_nodes[name];
for (auto &vec : mp) {
vec.second = state;
@@ -127,7 +119,7 @@ void DepHandler::query_node_top(std::string name, std::vector<std::vector<std::s
return;
}
while (p->next != nullptr) {
- query.emplace_back(std::vector<std::string>{name, p->next->val});
+ query.emplace_back(std::vector<std::string>{name, p->next->arc_name});
p = p->next;
}
}
@@ -137,8 +129,8 @@ void DepHandler::query_node(std::string name, std::vector<std::vector<std::strin
Node *p = nodes[name];
query.emplace_back(std::vector<std::string>{name});
for (auto cur = p->head->next; cur != nullptr; cur = cur->next) {
- query.emplace_back(std::vector<std::string>{name, cur->val});
- query_node(cur->val, query);
+ query.emplace_back(std::vector<std::string>{name, cur->arc_name});
+ query_node(cur->arc_name, query);
}
}
@@ -149,9 +141,9 @@ std::vector<std::string> DepHandler::get_pre_dependencies(std::string name) {
while (!q.empty()) {
auto &node = q.front();
q.pop();
- res.emplace_back(node->val);
+ res.emplace_back(node->name);
for (auto arc_node = node->head->next; arc_node != nullptr; arc_node = arc_node->next) {
- q.push(nodes[arc_node->val]);
+ q.push(nodes[arc_node->arc_name]);
}
}
return res;
diff --git a/src/plugin_mgr/dep_handler.h b/src/plugin_mgr/dep_handler.h
index a18f439..8ff91e2 100644
--- a/src/plugin_mgr/dep_handler.h
+++ b/src/plugin_mgr/dep_handler.h
@@ -19,23 +19,21 @@
struct ArcNode {
ArcNode *next;
- ArcNode *pre;
- std::string val;
+ std::string arc_name;
std::string node_name;
- ArcNode() : next(nullptr), pre(nullptr) {}
+ ArcNode() : next(nullptr) {}
};
// a instance node
struct Node {
Node *next;
- Node *pre;
ArcNode *head;
- std::string val;
+ std::string name;
int cnt;
int real_cnt;
bool state; // dependency closed-loop
- Node() : next(nullptr), pre(nullptr), head(nullptr), state(true), cnt(0), real_cnt(0) {}
- Node(std::string val): val(val), next(nullptr), pre(nullptr), head(nullptr), state(true), cnt(0), real_cnt(0) {}
+ Node() : next(nullptr), head(nullptr), state(true), cnt(0), real_cnt(0) {}
+ Node(std::string name): name(name), next(nullptr), head(nullptr), state(true), cnt(0), real_cnt(0) {}
};
class DepHandler {
@@ -55,6 +53,9 @@ public:
void query_node(std::string name, std::vector<std::vector<std::string>> &query);
// query all instance dependencies
void query_all_top(std::vector<std::vector<std::string>> &query);
+ bool have_dep(const std::string &name) {
+ return arc_nodes.count(name);
+ }
bool is_empty() const {
return nodes.empty();
}
diff --git a/src/plugin_mgr/instance_run_handler.cpp b/src/plugin_mgr/instance_run_handler.cpp
index a7f8b49..162079e 100644
--- a/src/plugin_mgr/instance_run_handler.cpp
+++ b/src/plugin_mgr/instance_run_handler.cpp
@@ -13,7 +13,7 @@
#include <thread>
#include <unistd.h>
-void* get_ring_buf(Instance *instance) {
+static void* get_ring_buf(Instance *instance) {
if (instance == nullptr) {
return nullptr;
}
@@ -32,19 +32,19 @@ static void reflash_ring_buf(Instance *instance) {
((CollectorInstance*)instance)->get_interface()->reflash_ring_buf();
}
-static void run_aware(Instance *instance, std::vector<std::string> &deps, std::unordered_map<std::string, Instance*> *all_instance) {
+void InstanceRunHandler::run_aware(Instance *instance, std::vector<std::string> &deps) {
void *a[MAX_DEPENDENCIES_SIZE];
for (int i = 0; i < deps.size(); ++i) {
- Instance *ins = (*all_instance)[deps[i]];
+ Instance *ins = memory_store->get_instance(deps[i]);
a[i] = get_ring_buf(ins);
}
((ScenarioInstance*)instance)->get_interface()->aware(a, (int)deps.size());
}
-static void run_tune(Instance *instance, std::vector<std::string> &deps, std::unordered_map<std::string, Instance*> *all_instance) {
+void InstanceRunHandler::run_tune(Instance *instance, std::vector<std::string> &deps) {
void *a[MAX_DEPENDENCIES_SIZE];
for (int i = 0; i < deps.size(); ++i) {
- Instance *ins = (*all_instance)[deps[i]];
+ Instance *ins = memory_store->get_instance(deps[i]);
a[i] = get_ring_buf(ins);
}
((TuneInstance*)instance)->get_interface()->tune(a, (int)deps.size());
@@ -67,6 +67,7 @@ void InstanceRunHandler::insert_instance(Instance *instance) {
}
INFO("[PluginManager] " << instance->get_name() << " instance insert into running queue.");
}
+
void InstanceRunHandler::delete_instance(Instance *instance) {
switch (instance->get_type()) {
case PluginType::COLLECTOR:
@@ -85,7 +86,6 @@ void InstanceRunHandler::delete_instance(Instance *instance) {
INFO("[PluginManager] " << instance->get_name() << " instance delete from running queue.");
}
-
void InstanceRunHandler::handle_instance() {
InstanceRunMessage msg;
while(this->recv_queue_try_pop(msg)){
@@ -100,6 +100,7 @@ void InstanceRunHandler::handle_instance() {
}
}
}
+
template<typename T>
static std::vector<std::string> get_deps(Instance *instance) {
std::string deps = ((T*)instance)->get_interface()->get_dep();
@@ -127,12 +128,12 @@ void InstanceRunHandler::adjust_collector_queue(const std::vector<std::string> &
}
if (ok) continue;
if (flag) {
- if (find(m_dep) && !collector.count(m_dep)) {
- this->insert_instance((*this->all_instance)[m_dep]);
+ if (is_instance_exist(m_dep) && !collector.count(m_dep)) {
+ this->insert_instance(memory_store->get_instance(m_dep));
}
} else {
- if (find(m_dep) && collector.count(m_dep)) {
- this->delete_instance((*this->all_instance)[m_dep]);
+ if (is_instance_exist(m_dep) && collector.count(m_dep)) {
+ this->delete_instance(memory_store->get_instance(m_dep));
}
}
}
@@ -143,26 +144,35 @@ void InstanceRunHandler::check_scenario_dependency(const std::vector<std::string
adjust_collector_queue(cur_deps, origin_deps, false);
}
-static void schedule_collector(Instance *instance, unsigned long long time) {
- int t = ((CollectorInstance*)instance)->get_interface()->get_cycle();
- if (time % t != 0) return;
- reflash_ring_buf(instance);
+void InstanceRunHandler::schedule_collector(uint64_t time) {
+ for (auto &p : collector) {
+ Instance *instance = p.second;
+ int t = ((CollectorInstance*)instance)->get_interface()->get_cycle();
+ if (time % t != 0) return;
+ reflash_ring_buf(instance);
+ }
}
-static void schedule_scenario(Instance *instance, unsigned long long time, InstanceRunHandler *instance_run_handler) {
- int t = ((ScenarioInstance*)instance)->get_interface()->get_cycle();
- if (time % t != 0) return;
- std::vector<std::string> origin_deps = get_deps<ScenarioInstance>(instance);
- run_aware(instance, origin_deps, instance_run_handler->get_all_instance());
- std::vector<std::string> cur_deps = get_deps<ScenarioInstance>(instance);
- instance_run_handler->check_scenario_dependency(origin_deps, cur_deps);
+void InstanceRunHandler::schedule_scenario(uint64_t time) {
+ for (auto &p : scenario) {
+ Instance *instance = p.second;
+ int t = ((ScenarioInstance*)instance)->get_interface()->get_cycle();
+ if (time % t != 0) return;
+ std::vector<std::string> origin_deps = get_deps<ScenarioInstance>(instance);
+ run_aware(instance, origin_deps);
+ std::vector<std::string> cur_deps = get_deps<ScenarioInstance>(instance);
+ check_scenario_dependency(origin_deps, cur_deps);
+ }
}
-static void schedule_tune(Instance *instance, unsigned long long time, InstanceRunHandler *instance_run_handler) {
- int t = ((TuneInstance*)instance)->get_interface()->get_cycle();
- if (time % t != 0) return;
- std::vector<std::string> deps = get_deps<TuneInstance>(instance);
- run_tune(instance, deps, instance_run_handler->get_all_instance());
+void InstanceRunHandler::schedule_tune(uint64_t time) {
+ for (auto &p : tune) {
+ Instance *instance = p.second;
+ int t = ((TuneInstance*)instance)->get_interface()->get_cycle();
+ if (time % t != 0) return;
+ std::vector<std::string> deps = get_deps<TuneInstance>(instance);
+ run_tune(instance, deps);
+ }
}
void start(InstanceRunHandler *instance_run_handler) {
@@ -170,16 +180,9 @@ void start(InstanceRunHandler *instance_run_handler) {
INFO("[PluginManager] instance schedule started!");
while(true) {
instance_run_handler->handle_instance();
- for (auto &p : instance_run_handler->get_collector()) {
- schedule_collector(p.second, time);
- }
- for (auto &p : instance_run_handler->get_scenario()) {
- schedule_scenario(p.second, time, instance_run_handler);
- }
-
- for (auto &p : instance_run_handler->get_tune()) {
- schedule_tune(p.second, time, instance_run_handler);
- }
+ instance_run_handler->schedule_collector(time);
+ instance_run_handler->schedule_scenario(time);
+ instance_run_handler->schedule_tune(time);
usleep(instance_run_handler->get_cycle() * 1000);
time += instance_run_handler->get_cycle();
@@ -189,4 +192,4 @@ void start(InstanceRunHandler *instance_run_handler) {
void InstanceRunHandler::run() {
std::thread t(start, this);
t.detach();
-}
\ No newline at end of file
+}
diff --git a/src/plugin_mgr/instance_run_handler.h b/src/plugin_mgr/instance_run_handler.h
index 4730ce8..c9a8dfc 100644
--- a/src/plugin_mgr/instance_run_handler.h
+++ b/src/plugin_mgr/instance_run_handler.h
@@ -15,13 +15,12 @@
#include "safe_queue.h"
#include "plugin.h"
#include "logger.h"
+#include "memory_store.h"
#include <set>
#include <string>
#include <vector>
#include <unordered_map>
-const int DEFAULT_CYCLE_SIZE = 10;
-const int MAX_DEPENDENCIES_SIZE = 20;
enum class RunType {
ENABLED,
DISABLED,
@@ -32,9 +31,6 @@ class InstanceRunMessage {
public:
InstanceRunMessage() {}
InstanceRunMessage(RunType type, Instance *instance) : type(type), instance(instance) {}
- void init() {
-
- }
RunType get_type() {
return type;
}
@@ -51,11 +47,12 @@ class InstanceRunHandler {
public:
InstanceRunHandler() : cycle(DEFAULT_CYCLE_SIZE) {}
void run();
+ void schedule_collector(uint64_t time);
+ void schedule_scenario(uint64_t time);
+ void schedule_tune(uint64_t time);
void handle_instance();
- void delete_instance(Instance *instance);
- void insert_instance(Instance *instance);
- void set_all_instance(std::unordered_map<std::string, Instance*> *all_instance) {
- this->all_instance = all_instance;
+ void set_memory_store(MemoryStore *memory_store) {
+ this->memory_store = memory_store;
}
void set_cycle(int cycle) {
this->cycle = cycle;
@@ -63,20 +60,8 @@ public:
int get_cycle() {
return cycle;
}
- bool find(std::string name) {
- return (*this->all_instance).count(name);
- }
- std::unordered_map<std::string, Instance*> get_collector() {
- return this->collector;
- }
- std::unordered_map<std::string, Instance*> get_scenario() {
- return this->scenario;
- }
- std::unordered_map<std::string, Instance*> get_tune() {
- return this->tune;
- }
- std::unordered_map<std::string, Instance*>* get_all_instance() {
- return this->all_instance;
+ bool is_instance_exist(const std::string &name) {
+ return memory_store->is_instance_exist(name);
}
void recv_queue_push(InstanceRunMessage &msg) {
this->recv_queue.push(msg);
@@ -87,16 +72,22 @@ public:
bool recv_queue_try_pop(InstanceRunMessage &msg) {
return this->recv_queue.try_pop(msg);
}
- void check_scenario_dependency(const std::vector<std::string> &deps, const std::vector<std::string> &m_deps);
private:
+ void run_aware(Instance *instance, std::vector<std::string> &deps);
+ void run_tune(Instance *instance, std::vector<std::string> &deps);
+ void delete_instance(Instance *instance);
+ void insert_instance(Instance *instance);
void adjust_collector_queue(const std::vector<std::string> &deps, const std::vector<std::string> &m_deps, bool flag);
-
+ void check_scenario_dependency(const std::vector<std::string> &deps, const std::vector<std::string> &m_deps);
+
std::unordered_map<std::string, Instance*> collector;
std::unordered_map<std::string, Instance*> scenario;
std::unordered_map<std::string, Instance*> tune;
SafeQueue<InstanceRunMessage> recv_queue;
- std::unordered_map<std::string, Instance*> *all_instance;
+ MemoryStore *memory_store;
int cycle;
+ static const int DEFAULT_CYCLE_SIZE = 10;
+ static const int MAX_DEPENDENCIES_SIZE = 20;
};
-#endif // !PLUGIN_MGR_INSTANCE_RUN_HANDLER_H
\ No newline at end of file
+#endif // !PLUGIN_MGR_INSTANCE_RUN_HANDLER_H
diff --git a/src/plugin_mgr/memory_store.h b/src/plugin_mgr/memory_store.h
new file mode 100644
index 0000000..190fcdd
--- /dev/null
+++ b/src/plugin_mgr/memory_store.h
@@ -0,0 +1,67 @@
+/******************************************************************************
+ * Copyright (c) 2024 Huawei Technologies Co., Ltd.
+ * oeAware is licensed under Mulan PSL v2.
+ * You can use this software according to the terms and conditions of the Mulan PSL v2.
+ * You may obtain a copy of Mulan PSL v2 at:
+ * http://license.coscl.org.cn/MulanPSL2
+ * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
+ * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
+ * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
+ * See the Mulan PSL v2 for more details.
+ ******************************************************************************/
+#ifndef PLUGIN_MGR_MEMORY_STORE_H
+#define PLUGIN_MGR_MEMORY_STORE_H
+#include "plugin.h"
+#include "logger.h"
+#include <unordered_map>
+
+//OeAware memory storage, which is used to store plugins and instances in the memory.
+class MemoryStore {
+public:
+ void add_plugin(const std::string &name, Plugin *plugin) {
+ this->plugins.insert(std::make_pair(name, plugin));
+ }
+ void add_instance(const std::string &name, Instance *instance) {
+ this->instances.insert(std::make_pair(name, instance));
+ }
+ Plugin* get_plugin(const std::string &name) const {
+ return this->plugins.at(name);
+ }
+ Instance* get_instance(const std::string &name) const {
+ return this->instances.at(name);
+ }
+ void delete_plugin(const std::string &name) {
+ Plugin *plugin = plugins.at(name);
+ this->plugins.erase(name);
+ delete plugin;
+ }
+ void delete_instance(const std::string &name) {
+ Instance *instance = instances.at(name);
+ this->instances.erase(name);
+ }
+ bool is_plugin_exist(const std::string &name) const {
+ return this->plugins.count(name);
+ }
+ bool is_instance_exist(const std::string &name) const {
+ return this->instances.count(name);
+ }
+ std::vector<Plugin*> get_all_plugins() {
+ std::vector<Plugin*> res;
+ for (auto &p : plugins) {
+ res.emplace_back(p.second);
+ }
+ return res;
+ }
+ std::vector<Instance*> get_all_instances() {
+ std::vector<Instance*> res;
+ for (auto &p : instances) {
+ res.emplace_back(p.second);
+ }
+ return res;
+ }
+private:
+ std::unordered_map<std::string, Plugin*> plugins;
+ std::unordered_map<std::string, Instance*> instances;
+};
+
+#endif // !PLUGIN_MGR_MEMORY_STORE_H
diff --git a/src/plugin_mgr/message_manager.cpp b/src/plugin_mgr/message_manager.cpp
index 92f467f..296c682 100644
--- a/src/plugin_mgr/message_manager.cpp
+++ b/src/plugin_mgr/message_manager.cpp
@@ -80,7 +80,7 @@ void TcpSocket::serve_accept(SafeQueue<Message> *handler_msg, SafeQueue<Message>
for (int i = 0; i < num; ++i) {
int cur_fd = evs[i].data.fd;
if (cur_fd == sock) {
- conn = accept(cur_fd, NULL, NULL);
+ int conn = accept(cur_fd, NULL, NULL);
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = conn;
diff --git a/src/plugin_mgr/message_manager.h b/src/plugin_mgr/message_manager.h
index d900c6f..d1faad3 100644
--- a/src/plugin_mgr/message_manager.h
+++ b/src/plugin_mgr/message_manager.h
@@ -68,15 +68,14 @@ private:
class TcpSocket {
public:
- TcpSocket() : conn(-1) {}
- int domain_listen(const char *name);
+ TcpSocket() {}
bool init();
void serve_accept(SafeQueue<Message> *handler_msg, SafeQueue<Message> *res_msg);
+private:
+ int domain_listen(const char *name);
+
int sock;
- struct sockaddr_in addr;
- int conn;
int epfd;
- struct epoll_event ev;
};
class MessageManager {
diff --git a/src/plugin_mgr/plugin.cpp b/src/plugin_mgr/plugin.cpp
index 091e73b..1a2db0d 100644
--- a/src/plugin_mgr/plugin.cpp
+++ b/src/plugin_mgr/plugin.cpp
@@ -11,6 +11,11 @@
******************************************************************************/
#include "plugin.h"
+const std::string Instance::PLUGIN_ENABLED = "running";
+const std::string Instance::PLUGIN_DISABLED = "close";
+const std::string Instance::PLUGIN_STATE_ON = "available";
+const std::string Instance::PLUGIN_STATE_OFF = "unavailable";
+
int Plugin::load(const std::string dl_path) {
void *handler = dlopen(dl_path.c_str(), RTLD_LAZY);
if (handler == nullptr) {
@@ -20,17 +25,8 @@ int Plugin::load(const std::string dl_path) {
return 0;
}
-std::string plugin_type_to_string(PluginType type) {
- switch (type) {
- case PluginType::COLLECTOR: {
- return COLLECTOR_TEXT;
- }
- case PluginType::SCENARIO: {
- return SCENARIO_TEXT;
- }
- case PluginType::TUNE: {
- return TUNE_TEXT;
- }
- }
- return "";
-}
\ No newline at end of file
+std::string Instance::get_info() const {
+ std::string state_text = this->state ? PLUGIN_STATE_ON : PLUGIN_STATE_OFF;
+ std::string run_text = this->enabled ? PLUGIN_ENABLED : PLUGIN_DISABLED;
+ return name + "(" + state_text + ", " + run_text + ")";
+}
diff --git a/src/plugin_mgr/plugin.h b/src/plugin_mgr/plugin.h
index 892cb74..11b4748 100644
--- a/src/plugin_mgr/plugin.h
+++ b/src/plugin_mgr/plugin.h
@@ -16,15 +16,6 @@
#include <vector>
#include <dlfcn.h>
-const std::string PLUGIN_ENABLED = "running";
-const std::string PLUGIN_DISABLED = "close";
-const std::string PLUGIN_STATE_ON = "available";
-const std::string PLUGIN_STATE_OFF = "unavailable";
-
-const std::string COLLECTOR_TEXT = "collector";
-const std::string SCENARIO_TEXT = "scenario";
-const std::string TUNE_TEXT = "tune";
-
enum class PluginType {
COLLECTOR,
SCENARIO,
@@ -67,17 +58,17 @@ public:
bool get_enabled() const {
return this->enabled;
}
- std::string get_info() const {
- std::string state_text = this->state ? PLUGIN_STATE_ON : PLUGIN_STATE_OFF;
- std::string run_text = this->enabled ? PLUGIN_ENABLED : PLUGIN_DISABLED;
- return name + "(" + state_text + ", " + run_text + ")";
- }
+ std::string get_info() const;
private:
std::string name;
std::string plugin_name;
PluginType type;
bool state;
bool enabled;
+ const static std::string PLUGIN_ENABLED;
+ const static std::string PLUGIN_DISABLED;
+ const static std::string PLUGIN_STATE_ON;
+ const static std::string PLUGIN_STATE_OFF;
};
class CollectorInstance : public Instance {
@@ -163,4 +154,4 @@ private:
std::string name;
};
-#endif
\ No newline at end of file
+#endif
diff --git a/src/plugin_mgr/plugin_manager.cpp b/src/plugin_mgr/plugin_manager.cpp
index ec9b85e..47f9451 100644
--- a/src/plugin_mgr/plugin_manager.cpp
+++ b/src/plugin_mgr/plugin_manager.cpp
@@ -16,15 +16,11 @@
#include <dirent.h>
#include <sys/stat.h>
+const std::string PluginManager::COLLECTOR_TEXT = "collector";
+const std::string PluginManager::SCENARIO_TEXT = "scenario";
+const std::string PluginManager::TUNE_TEXT = "tune";
const static int ST_MODE_MASK = 0777;
-bool PluginManager::check(char **deps, int len) {
- for (int i = 0; i < len; ++i) {
- if (!is_instance_exist(deps[i])) return false;
- }
- return true;
-}
-
void PluginManager::init(Config *config) {
plugin_types[COLLECTOR_TEXT] = PluginType::COLLECTOR;
plugin_types[SCENARIO_TEXT] = PluginType::SCENARIO;
@@ -32,24 +28,29 @@ void PluginManager::init(Config *config) {
this->config = config;
}
-bool PluginManager::remove(const std::string name) {
- if (!plugins.count(name)) return false;
- Plugin *plugin = plugins[name];
+bool PluginManager::remove(const std::string &name) {
+ if (!memory_store.is_plugin_exist(name)) return false;
+ Plugin *plugin = memory_store.get_plugin(name);
+ std::vector<std::string> instance_names;
for (int i = 0; i < plugin->get_instance_len(); ++i) {
Instance *instance = plugin->get_instance(i);
std::string iname = instance->get_name();
- instances.erase(iname);
+ if (dep_handler->have_dep(iname)) {
+ return false;
+ }
+ instance_names.emplace_back(iname);
+ }
+ for(auto &iname : instance_names) {
+ memory_store.delete_instance(iname);
dep_handler->del_node(iname);
- INFO("[PluginManager] instance : " << instance->get_name());
}
- delete plugin;
- plugins.erase(name);
+ memory_store.delete_plugin(name);
update_instance_state();
return true;
}
bool PluginManager::query_all_plugins(Message &res) {
- for (auto &t : plugins) {
- Plugin *p = t.second;
+ std::vector<Plugin*> all_plugins = memory_store.get_all_plugins();
+ for (auto &p : all_plugins) {
res.add_payload(p->get_name());
for (int i = 0; i < p->get_instance_len(); ++i) {
std::string info = p->get_instance(i)->get_info();
@@ -60,11 +61,11 @@ bool PluginManager::query_all_plugins(Message &res) {
}
bool PluginManager::query_plugin(std::string name, Message &res) {
- if (!plugins.count(name)) {
+ if (!memory_store.is_plugin_exist(name)) {
res.add_payload("no such plugin!");
return true;
}
- Plugin *plugin = plugins[name];
+ Plugin *plugin = memory_store.get_plugin(name);
res.add_payload(name);
for (int i = 0; i < plugin->get_instance_len(); ++i) {
std::string info = plugin->get_instance(i)->get_info();
@@ -106,10 +107,6 @@ std::vector<std::string> get_dep(T *interface) {
template<typename T, typename U>
void PluginManager::save_instance(Plugin *plugin, T *interface_list, int len) {
if (interface_list == nullptr) return;
- std::unordered_map<std::string, bool> tmp_interfaces;
- for (int i = 0; i < len; ++i) {
- tmp_interfaces[interface_list[i].get_name()] = true;
- }
for (int i = 0; i < len; ++i) {
T *interface = interface_list + i;
Instance *instance = new U();
@@ -127,7 +124,7 @@ void PluginManager::save_instance(Plugin *plugin, T *interface_list, int len) {
instance->set_state(dep_handler->get_node_state(name));
((U*)instance)->set_interface(interface);
DEBUG("[PluginManager] Instance: " << name.c_str());
- this->instances[name] = instance;
+ memory_store.add_instance(name, instance);
plugin->add_instance(instance);
}
}
@@ -163,17 +160,18 @@ bool PluginManager::load_instance(Plugin *plugin) {
}
void PluginManager::update_instance_state() {
- for (auto &v : instances) {
- if (dep_handler->get_node_state(v.first)) {
- v.second->set_state(true);
+ std::vector<Instance*> all_instances = memory_store.get_all_instances();
+ for (auto &instance : all_instances) {
+ if (dep_handler->get_node_state(instance->get_name())) {
+ instance->set_state(true);
} else {
- v.second->set_state(false);
+ instance->set_state(false);
}
}
}
bool PluginManager::load_plugin(const std::string name, PluginType type) {
- if (plugins.count(name)) {
+ if (memory_store.is_plugin_exist(name)) {
WARN("[PluginManager] " << name << " already loaded!");
return false;
}
@@ -188,21 +186,21 @@ bool PluginManager::load_plugin(const std::string name, PluginType type) {
delete plugin;
return false;
}
- plugins[name] = plugin;
+ memory_store.add_plugin(name, plugin);
return true;
}
-std::string generate_dot(std::unordered_map<std::string, Instance*> instances, const std::vector<std::vector<std::string>> &query) {
+std::string generate_dot(MemoryStore &memory_store, const std::vector<std::vector<std::string>> &query) {
std::string res;
res += "digraph G {\n";
std::unordered_map<std::string, std::vector<std::string>> sub_graph;
for (auto &vec : query) {
- Instance *instance = instances[vec[0]];
+ Instance *instance = memory_store.get_instance(vec[0]);
sub_graph[instance->get_plugin_name()].emplace_back(vec[0]);
if (vec.size() == 1) {
continue;
}
- instance = instances[vec[1]];
+ instance = memory_store.get_instance(vec[1]);
sub_graph[instance->get_plugin_name()].emplace_back(vec[1]);
res += vec[0] + "->" + vec[1] + ";";
}
@@ -229,7 +227,7 @@ bool PluginManager::query_top(std::string name, Message &res) {
res.add_payload("Instance not available!");
return false;
}
- std::string dot_text = generate_dot(instances, query);
+ std::string dot_text = generate_dot(memory_store, query);
res.add_payload(dot_text);
return true;
}
@@ -242,17 +240,17 @@ bool PluginManager::query_all_tops(Message &res) {
res.add_payload("No instance available!");
return false;
}
- std::string dot_text = generate_dot(instances, query);
+ std::string dot_text = generate_dot(memory_store, query);
res.add_payload(dot_text);
return true;
}
bool PluginManager::instance_enabled(std::string name) {
- if (!instances.count(name)) {
+ if (!memory_store.is_instance_exist(name)) {
WARN("[PluginManager] " << name << " instance can't load!");
return false;
}
- Instance *instance = instances[name];
+ Instance *instance = memory_store.get_instance(name);
if (!instance->get_state()) {
WARN("[PluginManager] " << name << " instance is unavailable, lacking dependencies!");
return false;
@@ -263,7 +261,7 @@ bool PluginManager::instance_enabled(std::string name) {
}
std::vector<std::string> pre_dependencies = dep_handler->get_pre_dependencies(name);
for (int i = pre_dependencies.size() - 1; i >= 0; --i) {
- instance = instances[pre_dependencies[i]];
+ instance = memory_store.get_instance(pre_dependencies[i]);
if (instance->get_enabled()) {
continue;
}
@@ -275,11 +273,11 @@ bool PluginManager::instance_enabled(std::string name) {
}
bool PluginManager::instance_disabled(std::string name) {
- if (!instances.count(name)) {
+ if (!memory_store.is_instance_exist(name)) {
WARN("[PluginManager] " << name << " instance can't load!");
return false;
}
- Instance *instance = instances[name];
+ Instance *instance = memory_store.get_instance(name);
if (!instance->get_state()) {
WARN("[PluginManager] " << name << " instance is unavailable, lacking dependencies!");
return false;
@@ -341,11 +339,11 @@ void PluginManager::pre_enable() {
EnableItem item = config->get_enable_list(i);
if (item.get_enabled()) {
std::string name = item.get_name();
- if (!plugins.count(name)) {
+ if (!memory_store.is_plugin_exist(name)) {
WARN("[PluginManager] plugin " << name << " cannot be enabled, because it does not exist.");
continue;
}
- Plugin *plugin = plugins[name];
+ Plugin *plugin = memory_store.get_plugin(name);
for (int j = 0; j < plugin->get_instance_len(); ++j) {
instance_enabled(plugin->get_instance(i)->get_name());
}
@@ -380,7 +378,7 @@ static bool check_load_msg(Message &msg, std::unordered_map<std::string, PluginT
}
void* PluginManager::get_data_buffer(std::string name) {
- Instance *instance = instances[name];
+ Instance *instance = memory_store.get_instance(name);
switch (instance->get_type()) {
case PluginType::COLLECTOR: {
CollectorInterface *collector_interface = ((CollectorInstance*)instance)->get_interface();
@@ -397,7 +395,7 @@ void* PluginManager::get_data_buffer(std::string name) {
}
void PluginManager::instance_dep_check(std::string name, Message &res) {
- Plugin *plugin = plugins[name];
+ Plugin *plugin = memory_store.get_plugin(name);
for (int i = 0; i < plugin->get_instance_len(); ++i) {
std::string instance_name = plugin->get_instance(i)->get_name();
std::vector<std::vector<std::string>> query;
@@ -405,7 +403,7 @@ void PluginManager::instance_dep_check(std::string name, Message &res) {
std::vector<std::string> lack;
for (auto &item : query) {
if (item.size() < 2) continue;
- if (!instances.count(item[1])) {
+ if (!memory_store.is_instance_exist(item[1])) {
lack.emplace_back(item[1]);
}
}
@@ -437,7 +435,7 @@ static bool file_exist(const std::string &file_name) {
}
int PluginManager::run() {
- instance_run_handler->set_all_instance(&instances);
+ instance_run_handler->set_memory_store(&memory_store);
instance_run_handler->run();
while (true) {
Message msg;
@@ -483,7 +481,7 @@ int PluginManager::run() {
res.add_payload(name + " removed!");
INFO("[PluginManager] " << name << " removed!");
} else {
- res.add_payload(name + " does not exist!");
+ res.add_payload(name + " remove failed!");
}
break;
}
@@ -562,4 +560,4 @@ int PluginManager::run() {
res_msg->push(res);
}
return 0;
-}
\ No newline at end of file
+}
diff --git a/src/plugin_mgr/plugin_manager.h b/src/plugin_mgr/plugin_manager.h
index 415e5ea..f648931 100644
--- a/src/plugin_mgr/plugin_manager.h
+++ b/src/plugin_mgr/plugin_manager.h
@@ -13,11 +13,10 @@
#define PLUGIN_MGR_PLUGIN_MANAGER_H
#include "instance_run_handler.h"
-#include "plugin.h"
#include "config.h"
+#include "memory_store.h"
#include "dep_handler.h"
#include "message_manager.h"
-#include "logger.h"
#include <vector>
#include <queue>
#include <unordered_map>
@@ -39,7 +38,6 @@ public:
void* get_data_buffer(std::string name);
private:
void pre_load_plugin(PluginType type);
- bool check(char **deps, int len);
bool query_all_plugins(Message &res);
bool query_plugin(std::string name, Message &res);
bool query_top(std::string name, Message &res);
@@ -54,24 +52,23 @@ private:
bool load_instance(Plugin *plugin);
bool load_plugin(const std::string path, PluginType type);
void batch_load();
- bool remove(const std::string name);
+ bool remove(const std::string &name);
void batch_remove();
void add_list(Message &msg);
- bool is_instance_exist(std::string name) {
- return this->instances.find(name) != this->instances.end();
- }
void update_instance_state();
private:
InstanceRunHandler *instance_run_handler;
Config *config;
SafeQueue<Message> *handler_msg;
SafeQueue<Message> *res_msg;
- std::unordered_map<std::string, Plugin*> plugins;
- std::unordered_map<std::string, Instance*> instances;
- DepHandler *dep_handler;
- std::unordered_map<std::string, PluginType> plugin_types;
+ MemoryStore memory_store;
+ DepHandler *dep_handler;
+ std::unordered_map<std::string, PluginType> plugin_types;
+ static const std::string COLLECTOR_TEXT;
+ static const std::string SCENARIO_TEXT;
+ static const std::string TUNE_TEXT;
};
bool check_permission(std::string path, int mode);
-#endif
\ No newline at end of file
+#endif
--
2.33.0