backport some fix patchs

fix some bugs

Signed-off-by: hurricane618 <hurricane618@hotmail.com>
This commit is contained in:
hurricane618 2023-12-05 00:05:34 +08:00
parent c73d203ddf
commit d224ac9cae
7 changed files with 714 additions and 1 deletions

View File

@ -0,0 +1,166 @@
From aca8d49dea577cac55a6b9704770882f18649fb9 Mon Sep 17 00:00:00 2001
From: hurricane618 <hurricane618@hotmail.com>
Date: Fri, 1 Dec 2023 01:26:18 +0800
Subject: [PATCH 3/6] add handle cleanup and refactor Subscribe/UnSubscribe
1. fix error residue handle
2. refactor Subscribe and UnSubscribe
Signed-off-by: hurricane618 <hurricane618@hotmail.com>
---
observer_agent/grpc_comm/Makefile | 8 +----
observer_agent/grpc_comm/grpc_api.h | 9 ++++-
observer_agent/grpc_comm/server.cpp | 51 +++++++++++++++++++++++++++--
3 files changed, 58 insertions(+), 10 deletions(-)
diff --git a/observer_agent/grpc_comm/Makefile b/observer_agent/grpc_comm/Makefile
index 4dbaa46..3c87ad8 100644
--- a/observer_agent/grpc_comm/Makefile
+++ b/observer_agent/grpc_comm/Makefile
@@ -38,13 +38,7 @@ PROTOS_PATH = ./protos
vpath %.proto $(PROTOS_PATH)
-all: system-check client server client_pub_demo client_sub_demo server_demo
-
-client: comm_api.pb.o comm_api.grpc.pb.o client.o
- @echo "only compile client don't link"
-
-server: comm_api.pb.o comm_api.grpc.pb.o server.o
- @echo "only compile server don't link"
+all: system-check client_pub_demo client_sub_demo server_demo
client_pub_demo: comm_api.pb.o comm_api.grpc.pb.o client.o client_pub_demo.o
$(CXX) $^ $(LDFLAGS) -o $@
diff --git a/observer_agent/grpc_comm/grpc_api.h b/observer_agent/grpc_comm/grpc_api.h
index 7b19552..44d4aa9 100644
--- a/observer_agent/grpc_comm/grpc_api.h
+++ b/observer_agent/grpc_comm/grpc_api.h
@@ -18,6 +18,8 @@ using grpc::ServerBuilder;
using grpc::ServerContext;
using grpc::ServerWriter;
+#define MAX_CONNECTION 5
+
class PubSubServiceImpl final : public SubManager::Service
{
public:
@@ -26,7 +28,12 @@ class PubSubServiceImpl final : public SubManager::Service
grpc::Status UnSubscribe(ServerContext *context, const UnSubscribeRequest *request, Message *response);
private:
- std::unordered_map<int, std::vector<ServerWriter<Message> *>> subscribers_;
+ std::unordered_map<std::string, std::vector<int>> suber_topic_;
+ std::unordered_map<std::string, std::vector<ServerWriter<Message> *>> suber_writer_;
+ std::unordered_map<std::string, std::vector<int>> suber_connection_;
+ std::mutex sub_mutex;
+ int connection_num = 0;
+ bool connect_status[MAX_CONNECTION] = {false};
};
void StopServer();
diff --git a/observer_agent/grpc_comm/server.cpp b/observer_agent/grpc_comm/server.cpp
index 04e8163..54ae66f 100644
--- a/observer_agent/grpc_comm/server.cpp
+++ b/observer_agent/grpc_comm/server.cpp
@@ -28,6 +28,7 @@ using grpc::ServerContext;
using grpc::ServerWriter;
#define MAX_CONNECTION 5
+#define CHECK_TIME 60
class PubSubServiceImpl final : public SubManager::Service
{
@@ -38,6 +39,8 @@ class PubSubServiceImpl final : public SubManager::Service
int cli_topic = request->topic();
std::string cli_name = request->sub_name();
Message msg;
+ Message keepalive_msg;
+ int i = 0, tmp_index;
if (connection_num >= MAX_CONNECTION) {
msg.set_text("over max connection number!");
@@ -65,8 +68,28 @@ class PubSubServiceImpl final : public SubManager::Service
sub_mutex.lock();
+ for (tmp_index = 0; tmp_index < MAX_CONNECTION; tmp_index++)
+ {
+ if (!connect_status[tmp_index])
+ break;
+ }
+
+ if (tmp_index == MAX_CONNECTION)
+ {
+ sub_mutex.unlock();
+ msg.set_text("multi-process max connection number!");
+ if (!writer->Write(msg))
+ {
+ std::cerr << "Failed to write the initial message" << std::endl;
+ return grpc::Status(grpc::StatusCode::INTERNAL, "Failed to write the message");
+ }
+ return grpc::Status(grpc::StatusCode::INTERNAL, "multi-process max connection number, Failed to Subscribe the topic");
+ }
+
suber_topic_[cli_name].push_back(cli_topic);
suber_writer_[cli_name].push_back(writer);
+ suber_connection_[cli_name].push_back(tmp_index);
+ connect_status[tmp_index] = true;
connection_num++;
sub_mutex.unlock();
@@ -78,9 +101,29 @@ class PubSubServiceImpl final : public SubManager::Service
return grpc::Status(grpc::StatusCode::INTERNAL, "Failed to write the message");
}
- // ToDo: set some condition to break loop
- while (1)
+ keepalive_msg.set_text("keepalive");
+ while (connect_status[tmp_index])
{
+ sleep(CHECK_TIME);
+ if (!writer->Write(keepalive_msg))
+ {
+ for (auto topic_item : suber_topic_[cli_name])
+ {
+ if (topic_item == cli_topic)
+ {
+ sub_mutex.lock();
+ suber_topic_[cli_name].erase(suber_topic_[cli_name].begin() + i);
+ suber_writer_[cli_name].erase(suber_writer_[cli_name].begin() + i);
+ connect_status[suber_connection_[cli_name].at(i)] = false;
+ suber_connection_[cli_name].erase(suber_connection_[cli_name].begin() + i);
+ connection_num--;
+ sub_mutex.unlock();
+ break;
+ }
+ i++;
+ }
+ return grpc::Status(grpc::StatusCode::INTERNAL, "writer is lose!");
+ }
}
return grpc::Status::OK;
}
@@ -136,6 +179,8 @@ class PubSubServiceImpl final : public SubManager::Service
{
suber_topic_[cli_name].erase(suber_topic_[cli_name].begin() + i);
suber_writer_[cli_name].erase(suber_writer_[cli_name].begin() + i);
+ connect_status[suber_connection_[cli_name].at(i)] = false;
+ suber_connection_[cli_name].erase(suber_connection_[cli_name].begin() + i);
connection_num--;
unsub_flag = 1;
break;
@@ -155,8 +200,10 @@ class PubSubServiceImpl final : public SubManager::Service
private:
std::unordered_map<std::string, std::vector<int>> suber_topic_;
std::unordered_map<std::string, std::vector<ServerWriter<Message> *>> suber_writer_;
+ std::unordered_map<std::string, std::vector<int>> suber_connection_;
std::mutex sub_mutex;
int connection_num = 0;
+ bool connect_status[MAX_CONNECTION] = {false};
};
std::unique_ptr<Server> server;
--
2.33.0

View File

@ -0,0 +1,78 @@
From ce029db530429609847b38ae7bae2428064e3a27 Mon Sep 17 00:00:00 2001
From: hurricane618 <hurricane618@hotmail.com>
Date: Mon, 4 Dec 2023 10:06:55 +0800
Subject: [PATCH 5/6] add nullptr check in Subscribe
fix leak of nullptr problem
Signed-off-by: hurricane618 <hurricane618@hotmail.com>
---
observer_agent/grpc_comm/client.cpp | 10 ++++++++--
observer_agent/grpc_comm/server.cpp | 15 ++++++++-------
2 files changed, 16 insertions(+), 9 deletions(-)
diff --git a/observer_agent/grpc_comm/client.cpp b/observer_agent/grpc_comm/client.cpp
index ecb54ae..84b5c96 100644
--- a/observer_agent/grpc_comm/client.cpp
+++ b/observer_agent/grpc_comm/client.cpp
@@ -43,16 +43,22 @@ std::unique_ptr<ClientReader<Message>> PubSubClient::Subscribe(const int topic)
SubscribeRequest request;
request.set_topic(topic);
request.set_sub_name(uuid_str);
+ std::string ret_info;
Message msg;
SubFlag = true;
std::unique_ptr<ClientReader<Message>> reader = stub_->Subscribe(&context, request);
- if (reader == nullptr)
+ ret_info = ReadFrom(reader);
+
+ if (ret_info.substr(0, 6) == "topic:")
+ {
+ std::cout << "Success subscribe." << std::endl;
+ return reader;
+ } else
{
std::cerr << "Failed to subscribe." << std::endl;
return nullptr;
}
- return reader;
}
void PubSubClient::Publish(const int topic, const std::string &content)
diff --git a/observer_agent/grpc_comm/server.cpp b/observer_agent/grpc_comm/server.cpp
index 54ae66f..d53866f 100644
--- a/observer_agent/grpc_comm/server.cpp
+++ b/observer_agent/grpc_comm/server.cpp
@@ -86,6 +86,14 @@ class PubSubServiceImpl final : public SubManager::Service
return grpc::Status(grpc::StatusCode::INTERNAL, "multi-process max connection number, Failed to Subscribe the topic");
}
+ msg.set_text("topic: " + std::to_string(cli_topic) + " Subscribe success!");
+ if (!writer->Write(msg))
+ {
+ std::cerr << "Failed to write the initial message" << std::endl;
+ sub_mutex.unlock();
+ return grpc::Status(grpc::StatusCode::INTERNAL, "Failed to write the message");
+ }
+
suber_topic_[cli_name].push_back(cli_topic);
suber_writer_[cli_name].push_back(writer);
suber_connection_[cli_name].push_back(tmp_index);
@@ -94,13 +102,6 @@ class PubSubServiceImpl final : public SubManager::Service
sub_mutex.unlock();
- msg.set_text("topic: " + std::to_string(cli_topic) + " Subscribe success!");
- if (!writer->Write(msg))
- {
- std::cerr << "Failed to write the initial message" << std::endl;
- return grpc::Status(grpc::StatusCode::INTERNAL, "Failed to write the message");
- }
-
keepalive_msg.set_text("keepalive");
while (connect_status[tmp_index])
{
--
2.33.0

View File

@ -0,0 +1,127 @@
From a4b7e62a1a24948a8f436ba3b5763317315e081a Mon Sep 17 00:00:00 2001
From: hurricane618 <hurricane618@hotmail.com>
Date: Wed, 29 Nov 2023 23:13:39 +0800
Subject: [PATCH 1/6] fix memleak bug in secDetector_program_action
fix memleak
Signed-off-by: hurricane618 <hurricane618@hotmail.com>
---
.../secDetector_program_action.c | 52 +++++--------------
1 file changed, 13 insertions(+), 39 deletions(-)
diff --git a/kerneldriver/cases/program_action/secDetector_program_action.c b/kerneldriver/cases/program_action/secDetector_program_action.c
index 4f8a555..504a36d 100644
--- a/kerneldriver/cases/program_action/secDetector_program_action.c
+++ b/kerneldriver/cases/program_action/secDetector_program_action.c
@@ -42,6 +42,7 @@
#include "secDetector_manager.h"
#include "secDetector_response.h"
+#include "secDetector_analyze.h"
#include <secDetector_module_type.h>
#define PATH_LEN 512
@@ -83,43 +84,6 @@ struct process_info {
int umask;
};
-int get_timestamp_str(char **ret_str)
-{
- struct timespec64 ts;
- struct tm stm;
- char *stm_str;
- int stm_str_len = 0;
-
- ktime_get_real_ts64(&ts);
- time64_to_tm(ts.tv_sec, 0, &stm);
-
- stm_str = (char *)kzalloc(TIME_STR_MAX_LEN, GFP_ATOMIC);
- if (stm_str == NULL) {
- pr_err("kzalloc failed\n");
- *ret_str = NULL;
- return 0;
- }
-
- stm_str_len = scnprintf(stm_str, TIME_STR_MAX_LEN,
- "timestamp=%04ld%02d%02d.%02d:%02d:%02d ",
- stm.tm_year + 1900, stm.tm_mon + 1, stm.tm_mday, stm.tm_hour, stm.tm_min, stm.tm_sec);
- if (stm_str_len <= 0) {
- pr_err("scnprintf failed\n");
- kfree(stm_str);
- *ret_str = NULL;
- return 0;
- }
-
- *ret_str = kstrdup(stm_str, GFP_KERNEL);
- if (*ret_str == NULL) {
- pr_err("kstrdup failed\n");
- stm_str_len = 0;
- }
-
- kfree(stm_str);
- return stm_str_len;
-}
-
char *get_process_path(struct task_struct *p, char *pathname, int len)
{
char *process_path = NULL;
@@ -276,7 +240,7 @@ static int ptrace_attach_pre_handler(struct secDetector_workflow *wf,
response_data_t log;
if (!pi) {
- pr_warn("get_common_process_info by fork failed\n");
+ pr_err("get_common_process_info by fork failed\n");
return 0;
}
@@ -302,6 +266,10 @@ static int ptrace_attach_pre_handler(struct secDetector_workflow *wf,
log.report_data.type = 0x00000800;
log.report_data.len = BUF_SIZE;
log.report_data.text = kzalloc(BUF_SIZE, GFP_ATOMIC);
+ if (!log.report_data.text) {
+ pr_err("log.report_data.text kzalloc failed!\n");
+ return 0;
+ }
snprintf(log.report_data.text, BUF_SIZE,
"%s event_type=call_api uid=%d exe=%s pid=%d comm=%s tgid=%d ppid=%d pcomm=%s pgid=%d sid=%d nodename=%s pns=%u root_pns=%u api_name=%s api_arg=[attach_task_pid=%d cur_task_pid=%d request=%ld addr=%lu flags=%lu]\n",
timestamp, pi->uid, pi->exe, pi->pid, pi->comm, pi->tgid, pi->ppid, pi->pcomm, pi->pgid, pi->sid, pi->nodename, pi->pns, pi->root_pns,
@@ -309,6 +277,7 @@ static int ptrace_attach_pre_handler(struct secDetector_workflow *wf,
secDetector_report(&log);
kfree(log.report_data.text);
+ kfree(timestamp);
put_common_process_info(pi);
return 0;
@@ -323,7 +292,7 @@ static int do_pipe2_pre_handler(struct secDetector_workflow *wf,
response_data_t log;
if (!pi) {
- pr_warn("get_common_process_info by fork failed\n");
+ pr_err("get_common_process_info by fork failed\n");
return 0;
}
timestamp_len = get_timestamp_str(&timestamp);
@@ -331,6 +300,10 @@ static int do_pipe2_pre_handler(struct secDetector_workflow *wf,
log.report_data.type = 0x00000200;
log.report_data.len = BUF_SIZE;
log.report_data.text = kzalloc(BUF_SIZE, GFP_ATOMIC);
+ if (!log.report_data.text) {
+ pr_err("log.report_data.text kzalloc failed!\n");
+ return 0;
+ }
snprintf(log.report_data.text, BUF_SIZE,
"%s event_type=createpipe uid=%d exe=%s pid=%d comm=%s tgid=%d ppid=%d pcomm=%s pgid=%d sid=%d nodename=%s pns=%u root_pns=%u dfd= pipe_name=%s\n",
timestamp, pi->uid, pi->exe, pi->pid, pi->comm, pi->tgid, pi->ppid, pi->pcomm, pi->pgid, pi->sid, pi->nodename, pi->pns, pi->root_pns,
@@ -338,6 +311,7 @@ static int do_pipe2_pre_handler(struct secDetector_workflow *wf,
secDetector_report(&log);
kfree(log.report_data.text);
+ kfree(timestamp);
put_common_process_info(pi);
return 0;
--
2.33.0

View File

@ -0,0 +1,35 @@
From 859b4a40626e870f83dda00e2e3ba40bf0558224 Mon Sep 17 00:00:00 2001
From: hurricane618 <hurricane618@hotmail.com>
Date: Thu, 30 Nov 2023 11:31:24 +0800
Subject: [PATCH 2/6] fix timestamp memleak
timestamp need to free in error process
Signed-off-by: hurricane618 <hurricane618@hotmail.com>
---
kerneldriver/cases/program_action/secDetector_program_action.c | 2 ++
1 file changed, 2 insertions(+)
diff --git a/kerneldriver/cases/program_action/secDetector_program_action.c b/kerneldriver/cases/program_action/secDetector_program_action.c
index 504a36d..1f0749a 100644
--- a/kerneldriver/cases/program_action/secDetector_program_action.c
+++ b/kerneldriver/cases/program_action/secDetector_program_action.c
@@ -268,6 +268,7 @@ static int ptrace_attach_pre_handler(struct secDetector_workflow *wf,
log.report_data.text = kzalloc(BUF_SIZE, GFP_ATOMIC);
if (!log.report_data.text) {
pr_err("log.report_data.text kzalloc failed!\n");
+ kfree(timestamp);
return 0;
}
snprintf(log.report_data.text, BUF_SIZE,
@@ -302,6 +303,7 @@ static int do_pipe2_pre_handler(struct secDetector_workflow *wf,
log.report_data.text = kzalloc(BUF_SIZE, GFP_ATOMIC);
if (!log.report_data.text) {
pr_err("log.report_data.text kzalloc failed!\n");
+ kfree(timestamp);
return 0;
}
snprintf(log.report_data.text, BUF_SIZE,
--
2.33.0

View File

@ -0,0 +1,120 @@
From d9a4c1cf011ab3d26b88229b5072ebbf6017893a Mon Sep 17 00:00:00 2001
From: zgzxx <zhangguangzhi3@huawei.com>
Date: Fri, 1 Dec 2023 17:36:11 +0800
Subject: [PATCH 4/6] lib modify for unsub
---
examples/python/client.py | 10 ++++++----
lib/secDetector_sdk.cpp | 19 +++++++++++++------
observer_agent/grpc_comm/server.cpp | 2 +-
3 files changed, 20 insertions(+), 11 deletions(-)
diff --git a/examples/python/client.py b/examples/python/client.py
index d6dd7aa..312384d 100644
--- a/examples/python/client.py
+++ b/examples/python/client.py
@@ -36,9 +36,11 @@ secDetectorsdklib.secUnsub.restype = None
secDetectorsdklib.secReadFrom.argtypes = [ctypes.c_void_p, ctypes.c_char_p, ctypes.c_int]
secDetectorsdklib.secReadFrom.restype = None
+g_read_flag = True
def thread_func_sub_and_read(num=0):
global g_cli_reader
+ global g_read_flag
cli_reader = secDetectorsdklib.secSub(1)
g_cli_reader_lock.acquire()
@@ -50,10 +52,7 @@ def thread_func_sub_and_read(num=0):
secDetectorsdklib.secReadFrom(cli_reader, data, data_len)
print("client read data:{}".format(data.value.decode()))
- while True:
- if data.value.decode() == 'end':
- print("client received end")
- break
+ while g_read_flag:
time.sleep(3)
secDetectorsdklib.secReadFrom(cli_reader, data, data_len)
print("client while read data:{}".format(data.value.decode()))
@@ -62,8 +61,11 @@ def thread_func_sub_and_read(num=0):
def thread_func_unsub(num=0):
global g_cli_reader
+ global g_read_flag
+
g_cli_reader_lock.acquire()
try:
+ g_read_flag = False
secDetectorsdklib.secUnsub(1, g_cli_reader)
finally:
g_cli_reader_lock.release()
diff --git a/lib/secDetector_sdk.cpp b/lib/secDetector_sdk.cpp
index ee76079..847dd2f 100644
--- a/lib/secDetector_sdk.cpp
+++ b/lib/secDetector_sdk.cpp
@@ -55,14 +55,14 @@ void secUnsub(const int topic, void *reader)
}
if (!reader)
- return;
+ return;
- g_client.Publish(topic, "end");
g_client.UnSubscribe(topic);
Readmap::iterator iter = g_reader_map.find(reader);
if (iter != g_reader_map.end()) {
g_reader_map.erase(iter);
+ reader = NULL;
}
}
@@ -70,13 +70,20 @@ void secReadFrom(void *reader, char *data, int data_len)
{
string msg("");
- if (!reader || !data || data_len <= 1)
- return;
+ if (!data || data_len <= 1)
+ return
+
+ memset(data, 0, data_len);
+
+ if (!reader)
+ return;
Readmap::iterator iter = g_reader_map.find(reader);
- if (iter != g_reader_map.end()) {
+ if (iter != g_reader_map.end()) {
msg = g_client.ReadFrom(iter->second);
- }
+ if (msg == "keepalive")
+ return;
+ }
strncpy(data, msg.c_str(), data_len - 1);
}
diff --git a/observer_agent/grpc_comm/server.cpp b/observer_agent/grpc_comm/server.cpp
index 54ae66f..3340dfa 100644
--- a/observer_agent/grpc_comm/server.cpp
+++ b/observer_agent/grpc_comm/server.cpp
@@ -104,7 +104,6 @@ class PubSubServiceImpl final : public SubManager::Service
keepalive_msg.set_text("keepalive");
while (connect_status[tmp_index])
{
- sleep(CHECK_TIME);
if (!writer->Write(keepalive_msg))
{
for (auto topic_item : suber_topic_[cli_name])
@@ -124,6 +123,7 @@ class PubSubServiceImpl final : public SubManager::Service
}
return grpc::Status(grpc::StatusCode::INTERNAL, "writer is lose!");
}
+ sleep(CHECK_TIME);
}
return grpc::Status::OK;
}
--
2.33.0

View File

@ -0,0 +1,178 @@
From 511ad1d3ce2c9e44621c9b508b3e44d3d5098f0e Mon Sep 17 00:00:00 2001
From: zgzxx <zhangguangzhi3@huawei.com>
Date: Mon, 4 Dec 2023 15:31:22 +0800
Subject: [PATCH 6/6] modify for multiple sub in the same process
---
lib/secDetector_sdk.cpp | 54 +++++++++++++++++++++--------
observer_agent/grpc_comm/client.cpp | 12 +++++++
observer_agent/grpc_comm/grpc_api.h | 2 ++
3 files changed, 53 insertions(+), 15 deletions(-)
diff --git a/lib/secDetector_sdk.cpp b/lib/secDetector_sdk.cpp
index 847dd2f..6f47f41 100644
--- a/lib/secDetector_sdk.cpp
+++ b/lib/secDetector_sdk.cpp
@@ -16,15 +16,17 @@
#include <string>
#include <iostream>
+#include <list>
+#include <mutex>
#include "../observer_agent/grpc_comm/grpc_api.h"
#define ALLTOPIC 0x00FFFFFF
using namespace std;
static string server_address("unix:///var/run/secDetector.sock");
-static PubSubClient g_client(grpc::CreateChannel(server_address, grpc::InsecureChannelCredentials()));
-using Readmap = map<void *, unique_ptr<ClientReader<Message>>>;
+using Readmap = map<void *, std::pair<unique_ptr<ClientReader<Message>>, PubSubClient *>>;
static Readmap g_reader_map;
+static mutex g_connect_mtx;
#ifdef __cplusplus
extern "C" {
@@ -32,60 +34,82 @@ extern "C" {
void *secSub(const int topic)
{
+ PubSubClient *cur_client;
if (topic <= 0 || topic > ALLTOPIC) {
- printf("secSub failed, topic:%d is error\n", topic);
+ printf("lib secSub failed, topic:%d is error\n", topic);
return NULL;
}
+ g_connect_mtx.lock();
+ std::shared_ptr<Channel> channel = grpc::CreateChannel(server_address, grpc::InsecureChannelCredentials());
+ cur_client = new(PubSubClient);
+ if (cur_client == nullptr) {
+ g_connect_mtx.unlock();
+ return NULL;
+ }
+ cur_client->init(channel);
+ unique_ptr<ClientReader<Message>> reader = cur_client->Subscribe(topic);
- unique_ptr<ClientReader<Message>> reader = g_client.Subscribe(topic);
-
- if (!reader)
+ if (!reader) {
+ printf("lib secSub failed, get reader null\n");
+ delete cur_client;
+ g_connect_mtx.unlock();
return NULL;
+ }
void * ret_reader = static_cast<void *>(reader.get());
- g_reader_map.insert(Readmap::value_type(ret_reader, move(reader)));
+ g_reader_map.insert(Readmap::value_type(ret_reader, std::make_pair(move(reader), cur_client)));
+ g_connect_mtx.unlock();
return ret_reader;
}
void secUnsub(const int topic, void *reader)
{
+ PubSubClient *cur_client;
if (topic <= 0 || topic > ALLTOPIC) {
- printf("secUnsub failed, topic:%d is error\n", topic);
+ printf("lib secUnsub failed, topic:%d is error\n", topic);
return;
}
if (!reader)
return;
- g_client.UnSubscribe(topic);
-
+ g_connect_mtx.lock();
Readmap::iterator iter = g_reader_map.find(reader);
if (iter != g_reader_map.end()) {
+ cur_client = iter->second.second;
+ cur_client->UnSubscribe(topic);
g_reader_map.erase(iter);
reader = NULL;
+ delete cur_client;
}
+ g_connect_mtx.unlock();
}
void secReadFrom(void *reader, char *data, int data_len)
{
string msg("");
+ PubSubClient *cur_client;
if (!data || data_len <= 1)
return
- memset(data, 0, data_len);
+ (void)memset(data, 0, data_len);
if (!reader)
return;
+ g_connect_mtx.lock();
Readmap::iterator iter = g_reader_map.find(reader);
if (iter != g_reader_map.end()) {
- msg = g_client.ReadFrom(iter->second);
- if (msg == "keepalive")
+ cur_client = iter->second.second;
+ msg = cur_client->ReadFrom(iter->second.first);
+ if (msg == "keepalive") {
+ g_connect_mtx.unlock();
return;
+ }
+ strncpy(data, msg.c_str(), data_len - 1);
}
-
- strncpy(data, msg.c_str(), data_len - 1);
+ g_connect_mtx.unlock();
}
#ifdef __cplusplus
diff --git a/observer_agent/grpc_comm/client.cpp b/observer_agent/grpc_comm/client.cpp
index ecb54ae..d4b0948 100644
--- a/observer_agent/grpc_comm/client.cpp
+++ b/observer_agent/grpc_comm/client.cpp
@@ -29,6 +29,8 @@ using grpc::ClientReader;
#define BUF_NUM 1024
+PubSubClient::PubSubClient() {}
+
PubSubClient::PubSubClient(std::shared_ptr<Channel> channel) : stub_(SubManager::NewStub(channel))
{
uuid_t uuid;
@@ -38,6 +40,16 @@ PubSubClient::PubSubClient(std::shared_ptr<Channel> channel) : stub_(SubManager:
uuid_str = std::string(uuid_temp);
}
+void PubSubClient::init(std::shared_ptr<Channel> channel)
+{
+ uuid_t uuid;
+ char uuid_temp[37];
+ uuid_generate(uuid);
+ uuid_unparse(uuid, uuid_temp);
+ uuid_str = std::string(uuid_temp);
+ stub_ = SubManager::NewStub(channel);
+}
+
std::unique_ptr<ClientReader<Message>> PubSubClient::Subscribe(const int topic)
{
SubscribeRequest request;
diff --git a/observer_agent/grpc_comm/grpc_api.h b/observer_agent/grpc_comm/grpc_api.h
index 44d4aa9..4bde109 100644
--- a/observer_agent/grpc_comm/grpc_api.h
+++ b/observer_agent/grpc_comm/grpc_api.h
@@ -42,7 +42,9 @@ void RunServer();
class PubSubClient
{
public:
+ PubSubClient();
PubSubClient(std::shared_ptr<Channel> channel);
+ void init(std::shared_ptr<Channel> channel);
std::unique_ptr<ClientReader<Message>> Subscribe(const int topic);
void Publish(const int topic, const std::string &content);
void UnSubscribe(const int topic);
--
2.33.0

View File

@ -5,7 +5,7 @@
Name : secDetector
Summary : OS Security Intrusion Detection System
Version : 1.0
Release : 7
Release : 8
License : GPL-2.0
Source0 : %{name}-v%{version}.tar.gz
BuildRequires: kernel-devel kernel-headers
@ -26,6 +26,12 @@ Patch0008: Backport-fix-memory-leak-bug-in-sc-analyze-unit.patch
Patch0009: Backport-fix-bug-of-mc-case-not-collect-data.patch
Patch0010: Backport-del-useless-code-for-timestamp.patch
Patch0011: Backport-modify-for-getting-common-info-in-createfile.patch
Patch0012: Backport-fix-memleak-bug-in-secDetector_program_action.patch
Patch0013: Backport-fix-timestamp-memleak.patch
Patch0014: Backport-add-handle-cleanup-and-refactor-Subscribe-UnSubscrib.patch
Patch0015: Backport-lib-modify-for-unsub.patch
Patch0016: Backport-add-nullptr-check-in-Subscribe.patch
Patch0017: Backport-modify-for-multiple-sub-in-the-same-process.patch
%description
OS Security Intrusion Detection System
@ -99,6 +105,9 @@ rm -rf %{buildroot}
%attr(0644,root,root) /usr/include/secDetector/secDetector_topic.h
%changelog
* Tue Dec 05 2023 hurricane618 <hurricane618@hotmail.com> 1.0-8
- backport some patches
* Wed Nov 29 2023 hurricane618 <hurricane618@hotmail.com> 1.0-7
- fix backport patch0001