From 22d40eb3b318226838306f6ce4b7c81095cfbe05 Mon Sep 17 00:00:00 2001 From: chenjingwen Date: Mon, 11 Dec 2023 20:15:08 +0800 Subject: [PATCH] backport patches to fix issues backport patches to fix issues such as grpc hangs. Signed-off-by: chenjingwen --- Backport-add-lock-limit-publish-API.patch | 48 +++++ ...ort-secDetectord-fix-a-grpc-hang-bug.patch | 107 ++++++++++ Backport-secUnsub-del-topic.patch | 189 ++++++++++++++++++ secDetector.spec | 8 +- 4 files changed, 351 insertions(+), 1 deletion(-) create mode 100644 Backport-add-lock-limit-publish-API.patch create mode 100644 Backport-secDetectord-fix-a-grpc-hang-bug.patch create mode 100644 Backport-secUnsub-del-topic.patch diff --git a/Backport-add-lock-limit-publish-API.patch b/Backport-add-lock-limit-publish-API.patch new file mode 100644 index 0000000..faae828 --- /dev/null +++ b/Backport-add-lock-limit-publish-API.patch @@ -0,0 +1,48 @@ +From b6705fe2d5b4aefdc0db16ae6ec9d75b69e8f421 Mon Sep 17 00:00:00 2001 +From: hurricane618 +Date: Wed, 6 Dec 2023 22:12:27 +0800 +Subject: [PATCH] add lock limit publish API + +call publish too quick, so add lock to limit it. + +Signed-off-by: hurricane618 +--- + observer_agent/service/main.cpp | 8 ++++++-- + 1 file changed, 6 insertions(+), 2 deletions(-) + +diff --git a/observer_agent/service/main.cpp b/observer_agent/service/main.cpp +index bd01690..8c24345 100644 +--- a/observer_agent/service/main.cpp ++++ b/observer_agent/service/main.cpp +@@ -80,6 +80,10 @@ static void sig_handler(int sig) + } + static bool debug = false; + ++static std::string server_address("unix:///var/run/secDetector.sock"); ++PubSubClient client; ++std::mutex pub_mutex; ++ + static void push_log(int type, const std::string &content) + { + if ((topic_mask & type) == 0) +@@ -92,8 +96,7 @@ static void push_log(int type, const std::string &content) + } + + // push to grpc +- std::string server_address("unix:///var/run/secDetector.sock"); +- PubSubClient client(grpc::CreateChannel(server_address, grpc::InsecureChannelCredentials())); ++ std::lock_guard lock(pub_mutex); + client.Publish(type, content); + } + +@@ -179,6 +182,7 @@ int main(int argc, char *argv[]) + std::thread thread_grpc = std::thread(RunServer); + std::thread thread_ebpf_process = std::thread(StartProcesseBPFProg, ebpf_cb, ringbuf_size_bytes, topic_mask); + std::thread thread_ebpf_file = std::thread(StartFileBPFProg, ebpf_cb, ringbuf_size_bytes, topic_mask); ++ client.init(grpc::CreateChannel(server_address, grpc::InsecureChannelCredentials())); + + while (exiting == 0) + { +-- +2.33.0 + diff --git a/Backport-secDetectord-fix-a-grpc-hang-bug.patch b/Backport-secDetectord-fix-a-grpc-hang-bug.patch new file mode 100644 index 0000000..4d70734 --- /dev/null +++ b/Backport-secDetectord-fix-a-grpc-hang-bug.patch @@ -0,0 +1,107 @@ +From f531f56ee36aecd3bb9eae527551eb8eff8c9457 Mon Sep 17 00:00:00 2001 +From: chenjingwen +Date: Mon, 11 Dec 2023 19:52:42 +0800 +Subject: [PATCH 2/2] secDetectord: fix a grpc hang bug + +break connection loop before shutdown +so that shutdown won't hang. + +Signed-off-by: chenjingwen +--- + observer_agent/grpc_comm/grpc_api.h | 1 + + observer_agent/grpc_comm/server.cpp | 30 ++++++++++++++++++++++++++--- + 2 files changed, 28 insertions(+), 3 deletions(-) + +diff --git a/observer_agent/grpc_comm/grpc_api.h b/observer_agent/grpc_comm/grpc_api.h +index 4bde109..27a9139 100644 +--- a/observer_agent/grpc_comm/grpc_api.h ++++ b/observer_agent/grpc_comm/grpc_api.h +@@ -26,6 +26,7 @@ class PubSubServiceImpl final : public SubManager::Service + grpc::Status Subscribe(ServerContext *context, const SubscribeRequest *request, ServerWriter *writer); + grpc::Status Publish(ServerContext *context, const PublishRequest *request, Message *response); + grpc::Status UnSubscribe(ServerContext *context, const UnSubscribeRequest *request, Message *response); ++ void CloseAllConnection(void); + + private: + std::unordered_map> suber_topic_; +diff --git a/observer_agent/grpc_comm/server.cpp b/observer_agent/grpc_comm/server.cpp +index cce5131..b47b1aa 100644 +--- a/observer_agent/grpc_comm/server.cpp ++++ b/observer_agent/grpc_comm/server.cpp +@@ -16,6 +16,7 @@ + #include "comm_api.grpc.pb.h" + #include + #include ++#include + + using data_comm::Message; + using data_comm::PublishRequest; +@@ -30,9 +31,23 @@ using grpc::ServerWriter; + #define MAX_CONNECTION 5 + #define CHECK_TIME 60 + ++static bool killed = false; ++ + class PubSubServiceImpl final : public SubManager::Service + { + public: ++ void CloseAllConnection(void) ++ { ++ std::lock_guard lk(wait_mutex); ++ ++ for (int i = 0; i < MAX_CONNECTION; i++) { ++ connect_status[i] = false; ++ } ++ ++ killed = true; ++ cv.notify_all(); ++ } ++ + grpc::Status Subscribe(ServerContext *context, const SubscribeRequest *request, + ServerWriter *writer) override + { +@@ -124,7 +139,7 @@ class PubSubServiceImpl final : public SubManager::Service + } + return grpc::Status(grpc::StatusCode::INTERNAL, "writer is lose!"); + } +- sleep(CHECK_TIME); ++ WaitKeeplive(); + } + return grpc::Status::OK; + } +@@ -203,21 +218,30 @@ class PubSubServiceImpl final : public SubManager::Service + std::unordered_map *>> suber_writer_; + std::unordered_map> suber_connection_; + std::mutex sub_mutex; ++ std::mutex wait_mutex; ++ std::condition_variable cv; + int connection_num = 0; + bool connect_status[MAX_CONNECTION] = {false}; ++ ++ void WaitKeeplive(void) ++ { ++ std::unique_lock lk(wait_mutex); ++ cv.wait_for(lk, std::chrono::seconds(CHECK_TIME), []{ return killed; }); ++ } + }; + +-std::unique_ptr server; ++static std::unique_ptr server; ++static PubSubServiceImpl service; + + void StopServer() + { ++ service.CloseAllConnection(); + server->Shutdown(); + } + + void RunServer() + { + std::string server_address("unix:///var/run/secDetector.sock"); +- PubSubServiceImpl service; + + ServerBuilder builder; + builder.AddListeningPort(server_address, grpc::InsecureServerCredentials()); +-- +2.33.0 + diff --git a/Backport-secUnsub-del-topic.patch b/Backport-secUnsub-del-topic.patch new file mode 100644 index 0000000..c136e5d --- /dev/null +++ b/Backport-secUnsub-del-topic.patch @@ -0,0 +1,189 @@ +From 4a58573122e2c677c427cb43bb02e6f055fdf391 Mon Sep 17 00:00:00 2001 +From: zgzxx +Date: Mon, 11 Dec 2023 20:57:58 +0800 +Subject: [PATCH] secUnsub del topic + +--- + examples/python/client.py | 4 +-- + lib/secDetector_sdk.cpp | 8 ++---- + lib/secDetector_sdk.h | 2 +- + observer_agent/grpc_comm/client.cpp | 3 +-- + observer_agent/grpc_comm/client_sub_demo.cpp | 2 +- + observer_agent/grpc_comm/grpc_api.h | 2 +- + .../grpc_comm/protos/comm_api.proto | 3 +-- + observer_agent/grpc_comm/server.cpp | 27 ++++++++----------- + 8 files changed, 20 insertions(+), 31 deletions(-) + +diff --git a/examples/python/client.py b/examples/python/client.py +index 312384d..3fb95b4 100644 +--- a/examples/python/client.py ++++ b/examples/python/client.py +@@ -31,7 +31,7 @@ g_cli_reader_lock = threading.Lock() + + secDetectorsdklib.secSub.argtypes = [ctypes.c_int] + secDetectorsdklib.secSub.restype = ctypes.c_void_p +-secDetectorsdklib.secUnsub.argtypes = [ctypes.c_int, ctypes.c_void_p] ++secDetectorsdklib.secUnsub.argtypes = [ctypes.c_void_p] + secDetectorsdklib.secUnsub.restype = None + secDetectorsdklib.secReadFrom.argtypes = [ctypes.c_void_p, ctypes.c_char_p, ctypes.c_int] + secDetectorsdklib.secReadFrom.restype = None +@@ -66,7 +66,7 @@ def thread_func_unsub(num=0): + g_cli_reader_lock.acquire() + try: + g_read_flag = False +- secDetectorsdklib.secUnsub(1, g_cli_reader) ++ secDetectorsdklib.secUnsub(g_cli_reader) + finally: + g_cli_reader_lock.release() + print("client thread_func_unsub end") +diff --git a/lib/secDetector_sdk.cpp b/lib/secDetector_sdk.cpp +index 6f47f41..6b00953 100644 +--- a/lib/secDetector_sdk.cpp ++++ b/lib/secDetector_sdk.cpp +@@ -62,13 +62,9 @@ void *secSub(const int topic) + return ret_reader; + } + +-void secUnsub(const int topic, void *reader) ++void secUnsub(void *reader) + { + PubSubClient *cur_client; +- if (topic <= 0 || topic > ALLTOPIC) { +- printf("lib secUnsub failed, topic:%d is error\n", topic); +- return; +- } + + if (!reader) + return; +@@ -77,7 +73,7 @@ void secUnsub(const int topic, void *reader) + Readmap::iterator iter = g_reader_map.find(reader); + if (iter != g_reader_map.end()) { + cur_client = iter->second.second; +- cur_client->UnSubscribe(topic); ++ cur_client->UnSubscribe(); + g_reader_map.erase(iter); + reader = NULL; + delete cur_client; +diff --git a/lib/secDetector_sdk.h b/lib/secDetector_sdk.h +index abf112b..92ef5b4 100644 +--- a/lib/secDetector_sdk.h ++++ b/lib/secDetector_sdk.h +@@ -18,7 +18,7 @@ + #define SECDETECTOR_SDK_H + + void *secSub(const int topic); +-void secUnsub(const int topic, void *reader); ++void secUnsub(void *reader); + void secReadFrom(void *reader, char *data, int data_len); + + #endif +diff --git a/observer_agent/grpc_comm/client.cpp b/observer_agent/grpc_comm/client.cpp +index 0dd02f9..5cf8cf2 100644 +--- a/observer_agent/grpc_comm/client.cpp ++++ b/observer_agent/grpc_comm/client.cpp +@@ -87,10 +87,9 @@ void PubSubClient::Publish(const int topic, const std::string &content) + } + } + +-void PubSubClient::UnSubscribe(const int topic) ++void PubSubClient::UnSubscribe(void) + { + UnSubscribeRequest request; +- request.set_topic(topic); + request.set_sub_name(uuid_str); + + ClientContext unsub_context; +diff --git a/observer_agent/grpc_comm/client_sub_demo.cpp b/observer_agent/grpc_comm/client_sub_demo.cpp +index fbf27ad..550b503 100644 +--- a/observer_agent/grpc_comm/client_sub_demo.cpp ++++ b/observer_agent/grpc_comm/client_sub_demo.cpp +@@ -34,7 +34,7 @@ int main(int argc, char **argv) + some_data = client.ReadFrom(cli_reader); + std::cout << "loop whz: " << some_data << std::endl; + } +- client.UnSubscribe(std::stoi(argv[1])); ++ client.UnSubscribe(); + + return 0; + } +diff --git a/observer_agent/grpc_comm/grpc_api.h b/observer_agent/grpc_comm/grpc_api.h +index 27a9139..c5b43cc 100644 +--- a/observer_agent/grpc_comm/grpc_api.h ++++ b/observer_agent/grpc_comm/grpc_api.h +@@ -48,7 +48,7 @@ class PubSubClient + void init(std::shared_ptr channel); + std::unique_ptr> Subscribe(const int topic); + void Publish(const int topic, const std::string &content); +- void UnSubscribe(const int topic); ++ void UnSubscribe(void); + std::string ReadFrom(std::unique_ptr> &reader); + + private: +diff --git a/observer_agent/grpc_comm/protos/comm_api.proto b/observer_agent/grpc_comm/protos/comm_api.proto +index 6c84865..cf1e445 100644 +--- a/observer_agent/grpc_comm/protos/comm_api.proto ++++ b/observer_agent/grpc_comm/protos/comm_api.proto +@@ -13,8 +13,7 @@ message SubscribeRequest { + } + + message UnSubscribeRequest { +- int32 topic = 1; +- string sub_name = 2; ++ string sub_name = 1; + } + + message PublishRequest { +diff --git a/observer_agent/grpc_comm/server.cpp b/observer_agent/grpc_comm/server.cpp +index b47b1aa..938d09c 100644 +--- a/observer_agent/grpc_comm/server.cpp ++++ b/observer_agent/grpc_comm/server.cpp +@@ -176,7 +176,6 @@ class PubSubServiceImpl final : public SubManager::Service + + grpc::Status UnSubscribe(ServerContext *context, const UnSubscribeRequest *request, Message *response) override + { +- int cli_topic = request->topic(); + std::string cli_name = request->sub_name(); + int i = 0; + int unsub_flag = 0; +@@ -189,27 +188,23 @@ class PubSubServiceImpl final : public SubManager::Service + + std::lock_guard lock(sub_mutex); + +- for (auto topic_item : suber_topic_[cli_name]) ++ std::unordered_map>::iterator iter = suber_topic_.find(cli_name); ++ if (iter != suber_topic_.end()) + { +- if (topic_item == cli_topic) +- { +- suber_topic_[cli_name].erase(suber_topic_[cli_name].begin() + i); +- suber_writer_[cli_name].erase(suber_writer_[cli_name].begin() + i); +- connect_status[suber_connection_[cli_name].at(i)] = false; +- suber_connection_[cli_name].erase(suber_connection_[cli_name].begin() + i); +- connection_num--; +- unsub_flag = 1; +- break; +- } +- i++; ++ suber_topic_[cli_name].erase(suber_topic_[cli_name].begin() + i); ++ suber_writer_[cli_name].erase(suber_writer_[cli_name].begin() + i); ++ connect_status[suber_connection_[cli_name].at(i)] = false; ++ suber_connection_[cli_name].erase(suber_connection_[cli_name].begin() + i); ++ connection_num--; ++ unsub_flag = 1; + } + + if (!unsub_flag) + { +- response->set_text("don't exist the topic: " + std::to_string(cli_topic)); +- return grpc::Status(grpc::StatusCode::INTERNAL, "Failed to UnSubscribe topic!"); ++ response->set_text("don't exist the reader"); ++ return grpc::Status(grpc::StatusCode::INTERNAL, "Failed to UnSubscribe reader!"); + } +- response->set_text("topic: " + std::to_string(cli_topic) + " UnSubscribe success!"); ++ response->set_text("UnSubscribe success!"); + return grpc::Status::OK; + } + +-- +2.33.0 + diff --git a/secDetector.spec b/secDetector.spec index 44b6549..34287a6 100644 --- a/secDetector.spec +++ b/secDetector.spec @@ -5,7 +5,7 @@ Name : secDetector Summary : OS Security Intrusion Detection System Version : 1.0 -Release : 9 +Release : 10 License : GPL-2.0 Source0 : %{name}-v%{version}.tar.gz BuildRequires: kernel-devel kernel-headers @@ -34,6 +34,9 @@ Patch0016: Backport-add-nullptr-check-in-Subscribe.patch Patch0017: Backport-modify-for-multiple-sub-in-the-same-process.patch Patch0018: Backport-creatfile-check-op-intent-value.patch Patch0019: Backport-createfile-check-f_mode-and-fix-typo.patch +Patch0021: Backport-add-lock-limit-publish-API.patch +Patch0022: Backport-secDetectord-fix-a-grpc-hang-bug.patch +Patch0023: Backport-secUnsub-del-topic.patch %description OS Security Intrusion Detection System @@ -107,6 +110,9 @@ rm -rf %{buildroot} %attr(0644,root,root) /usr/include/secDetector/secDetector_topic.h %changelog +* Mon Dec 11 2023 chenjingwen6 1.0-10 +- backport some patches to fix issue such as grpc hangs + * Sat Dec 9 2023 zhangguangzhi 1.0-9 - backport some patches