secDetector/Backport-modify-for-multiple-sub-in-the-same-process.patch
hurricane618 d224ac9cae backport some fix patchs
fix some bugs

Signed-off-by: hurricane618 <hurricane618@hotmail.com>
2023-12-05 09:43:14 +08:00

179 lines
5.1 KiB
Diff

From 511ad1d3ce2c9e44621c9b508b3e44d3d5098f0e Mon Sep 17 00:00:00 2001
From: zgzxx <zhangguangzhi3@huawei.com>
Date: Mon, 4 Dec 2023 15:31:22 +0800
Subject: [PATCH 6/6] modify for multiple sub in the same process
---
lib/secDetector_sdk.cpp | 54 +++++++++++++++++++++--------
observer_agent/grpc_comm/client.cpp | 12 +++++++
observer_agent/grpc_comm/grpc_api.h | 2 ++
3 files changed, 53 insertions(+), 15 deletions(-)
diff --git a/lib/secDetector_sdk.cpp b/lib/secDetector_sdk.cpp
index 847dd2f..6f47f41 100644
--- a/lib/secDetector_sdk.cpp
+++ b/lib/secDetector_sdk.cpp
@@ -16,15 +16,17 @@
#include <string>
#include <iostream>
+#include <list>
+#include <mutex>
#include "../observer_agent/grpc_comm/grpc_api.h"
#define ALLTOPIC 0x00FFFFFF
using namespace std;
static string server_address("unix:///var/run/secDetector.sock");
-static PubSubClient g_client(grpc::CreateChannel(server_address, grpc::InsecureChannelCredentials()));
-using Readmap = map<void *, unique_ptr<ClientReader<Message>>>;
+using Readmap = map<void *, std::pair<unique_ptr<ClientReader<Message>>, PubSubClient *>>;
static Readmap g_reader_map;
+static mutex g_connect_mtx;
#ifdef __cplusplus
extern "C" {
@@ -32,60 +34,82 @@ extern "C" {
void *secSub(const int topic)
{
+ PubSubClient *cur_client;
if (topic <= 0 || topic > ALLTOPIC) {
- printf("secSub failed, topic:%d is error\n", topic);
+ printf("lib secSub failed, topic:%d is error\n", topic);
return NULL;
}
+ g_connect_mtx.lock();
+ std::shared_ptr<Channel> channel = grpc::CreateChannel(server_address, grpc::InsecureChannelCredentials());
+ cur_client = new(PubSubClient);
+ if (cur_client == nullptr) {
+ g_connect_mtx.unlock();
+ return NULL;
+ }
+ cur_client->init(channel);
+ unique_ptr<ClientReader<Message>> reader = cur_client->Subscribe(topic);
- unique_ptr<ClientReader<Message>> reader = g_client.Subscribe(topic);
-
- if (!reader)
+ if (!reader) {
+ printf("lib secSub failed, get reader null\n");
+ delete cur_client;
+ g_connect_mtx.unlock();
return NULL;
+ }
void * ret_reader = static_cast<void *>(reader.get());
- g_reader_map.insert(Readmap::value_type(ret_reader, move(reader)));
+ g_reader_map.insert(Readmap::value_type(ret_reader, std::make_pair(move(reader), cur_client)));
+ g_connect_mtx.unlock();
return ret_reader;
}
void secUnsub(const int topic, void *reader)
{
+ PubSubClient *cur_client;
if (topic <= 0 || topic > ALLTOPIC) {
- printf("secUnsub failed, topic:%d is error\n", topic);
+ printf("lib secUnsub failed, topic:%d is error\n", topic);
return;
}
if (!reader)
return;
- g_client.UnSubscribe(topic);
-
+ g_connect_mtx.lock();
Readmap::iterator iter = g_reader_map.find(reader);
if (iter != g_reader_map.end()) {
+ cur_client = iter->second.second;
+ cur_client->UnSubscribe(topic);
g_reader_map.erase(iter);
reader = NULL;
+ delete cur_client;
}
+ g_connect_mtx.unlock();
}
void secReadFrom(void *reader, char *data, int data_len)
{
string msg("");
+ PubSubClient *cur_client;
if (!data || data_len <= 1)
return
- memset(data, 0, data_len);
+ (void)memset(data, 0, data_len);
if (!reader)
return;
+ g_connect_mtx.lock();
Readmap::iterator iter = g_reader_map.find(reader);
if (iter != g_reader_map.end()) {
- msg = g_client.ReadFrom(iter->second);
- if (msg == "keepalive")
+ cur_client = iter->second.second;
+ msg = cur_client->ReadFrom(iter->second.first);
+ if (msg == "keepalive") {
+ g_connect_mtx.unlock();
return;
+ }
+ strncpy(data, msg.c_str(), data_len - 1);
}
-
- strncpy(data, msg.c_str(), data_len - 1);
+ g_connect_mtx.unlock();
}
#ifdef __cplusplus
diff --git a/observer_agent/grpc_comm/client.cpp b/observer_agent/grpc_comm/client.cpp
index ecb54ae..d4b0948 100644
--- a/observer_agent/grpc_comm/client.cpp
+++ b/observer_agent/grpc_comm/client.cpp
@@ -29,6 +29,8 @@ using grpc::ClientReader;
#define BUF_NUM 1024
+PubSubClient::PubSubClient() {}
+
PubSubClient::PubSubClient(std::shared_ptr<Channel> channel) : stub_(SubManager::NewStub(channel))
{
uuid_t uuid;
@@ -38,6 +40,16 @@ PubSubClient::PubSubClient(std::shared_ptr<Channel> channel) : stub_(SubManager:
uuid_str = std::string(uuid_temp);
}
+void PubSubClient::init(std::shared_ptr<Channel> channel)
+{
+ uuid_t uuid;
+ char uuid_temp[37];
+ uuid_generate(uuid);
+ uuid_unparse(uuid, uuid_temp);
+ uuid_str = std::string(uuid_temp);
+ stub_ = SubManager::NewStub(channel);
+}
+
std::unique_ptr<ClientReader<Message>> PubSubClient::Subscribe(const int topic)
{
SubscribeRequest request;
diff --git a/observer_agent/grpc_comm/grpc_api.h b/observer_agent/grpc_comm/grpc_api.h
index 44d4aa9..4bde109 100644
--- a/observer_agent/grpc_comm/grpc_api.h
+++ b/observer_agent/grpc_comm/grpc_api.h
@@ -42,7 +42,9 @@ void RunServer();
class PubSubClient
{
public:
+ PubSubClient();
PubSubClient(std::shared_ptr<Channel> channel);
+ void init(std::shared_ptr<Channel> channel);
std::unique_ptr<ClientReader<Message>> Subscribe(const int topic);
void Publish(const int topic, const std::string &content);
void UnSubscribe(const int topic);
--
2.33.0