179 lines
5.1 KiB
Diff
179 lines
5.1 KiB
Diff
From 511ad1d3ce2c9e44621c9b508b3e44d3d5098f0e Mon Sep 17 00:00:00 2001
|
|
From: zgzxx <zhangguangzhi3@huawei.com>
|
|
Date: Mon, 4 Dec 2023 15:31:22 +0800
|
|
Subject: [PATCH 6/6] modify for multiple sub in the same process
|
|
|
|
---
|
|
lib/secDetector_sdk.cpp | 54 +++++++++++++++++++++--------
|
|
observer_agent/grpc_comm/client.cpp | 12 +++++++
|
|
observer_agent/grpc_comm/grpc_api.h | 2 ++
|
|
3 files changed, 53 insertions(+), 15 deletions(-)
|
|
|
|
diff --git a/lib/secDetector_sdk.cpp b/lib/secDetector_sdk.cpp
|
|
index 847dd2f..6f47f41 100644
|
|
--- a/lib/secDetector_sdk.cpp
|
|
+++ b/lib/secDetector_sdk.cpp
|
|
@@ -16,15 +16,17 @@
|
|
|
|
#include <string>
|
|
#include <iostream>
|
|
+#include <list>
|
|
+#include <mutex>
|
|
#include "../observer_agent/grpc_comm/grpc_api.h"
|
|
|
|
#define ALLTOPIC 0x00FFFFFF
|
|
using namespace std;
|
|
static string server_address("unix:///var/run/secDetector.sock");
|
|
-static PubSubClient g_client(grpc::CreateChannel(server_address, grpc::InsecureChannelCredentials()));
|
|
|
|
-using Readmap = map<void *, unique_ptr<ClientReader<Message>>>;
|
|
+using Readmap = map<void *, std::pair<unique_ptr<ClientReader<Message>>, PubSubClient *>>;
|
|
static Readmap g_reader_map;
|
|
+static mutex g_connect_mtx;
|
|
|
|
#ifdef __cplusplus
|
|
extern "C" {
|
|
@@ -32,60 +34,82 @@ extern "C" {
|
|
|
|
void *secSub(const int topic)
|
|
{
|
|
+ PubSubClient *cur_client;
|
|
if (topic <= 0 || topic > ALLTOPIC) {
|
|
- printf("secSub failed, topic:%d is error\n", topic);
|
|
+ printf("lib secSub failed, topic:%d is error\n", topic);
|
|
return NULL;
|
|
}
|
|
+ g_connect_mtx.lock();
|
|
+ std::shared_ptr<Channel> channel = grpc::CreateChannel(server_address, grpc::InsecureChannelCredentials());
|
|
+ cur_client = new(PubSubClient);
|
|
+ if (cur_client == nullptr) {
|
|
+ g_connect_mtx.unlock();
|
|
+ return NULL;
|
|
+ }
|
|
+ cur_client->init(channel);
|
|
+ unique_ptr<ClientReader<Message>> reader = cur_client->Subscribe(topic);
|
|
|
|
- unique_ptr<ClientReader<Message>> reader = g_client.Subscribe(topic);
|
|
-
|
|
- if (!reader)
|
|
+ if (!reader) {
|
|
+ printf("lib secSub failed, get reader null\n");
|
|
+ delete cur_client;
|
|
+ g_connect_mtx.unlock();
|
|
return NULL;
|
|
+ }
|
|
void * ret_reader = static_cast<void *>(reader.get());
|
|
|
|
- g_reader_map.insert(Readmap::value_type(ret_reader, move(reader)));
|
|
+ g_reader_map.insert(Readmap::value_type(ret_reader, std::make_pair(move(reader), cur_client)));
|
|
+ g_connect_mtx.unlock();
|
|
return ret_reader;
|
|
}
|
|
|
|
void secUnsub(const int topic, void *reader)
|
|
{
|
|
+ PubSubClient *cur_client;
|
|
if (topic <= 0 || topic > ALLTOPIC) {
|
|
- printf("secUnsub failed, topic:%d is error\n", topic);
|
|
+ printf("lib secUnsub failed, topic:%d is error\n", topic);
|
|
return;
|
|
}
|
|
|
|
if (!reader)
|
|
return;
|
|
|
|
- g_client.UnSubscribe(topic);
|
|
-
|
|
+ g_connect_mtx.lock();
|
|
Readmap::iterator iter = g_reader_map.find(reader);
|
|
if (iter != g_reader_map.end()) {
|
|
+ cur_client = iter->second.second;
|
|
+ cur_client->UnSubscribe(topic);
|
|
g_reader_map.erase(iter);
|
|
reader = NULL;
|
|
+ delete cur_client;
|
|
}
|
|
+ g_connect_mtx.unlock();
|
|
}
|
|
|
|
void secReadFrom(void *reader, char *data, int data_len)
|
|
{
|
|
string msg("");
|
|
+ PubSubClient *cur_client;
|
|
|
|
if (!data || data_len <= 1)
|
|
return
|
|
|
|
- memset(data, 0, data_len);
|
|
+ (void)memset(data, 0, data_len);
|
|
|
|
if (!reader)
|
|
return;
|
|
|
|
+ g_connect_mtx.lock();
|
|
Readmap::iterator iter = g_reader_map.find(reader);
|
|
if (iter != g_reader_map.end()) {
|
|
- msg = g_client.ReadFrom(iter->second);
|
|
- if (msg == "keepalive")
|
|
+ cur_client = iter->second.second;
|
|
+ msg = cur_client->ReadFrom(iter->second.first);
|
|
+ if (msg == "keepalive") {
|
|
+ g_connect_mtx.unlock();
|
|
return;
|
|
+ }
|
|
+ strncpy(data, msg.c_str(), data_len - 1);
|
|
}
|
|
-
|
|
- strncpy(data, msg.c_str(), data_len - 1);
|
|
+ g_connect_mtx.unlock();
|
|
}
|
|
|
|
#ifdef __cplusplus
|
|
diff --git a/observer_agent/grpc_comm/client.cpp b/observer_agent/grpc_comm/client.cpp
|
|
index ecb54ae..d4b0948 100644
|
|
--- a/observer_agent/grpc_comm/client.cpp
|
|
+++ b/observer_agent/grpc_comm/client.cpp
|
|
@@ -29,6 +29,8 @@ using grpc::ClientReader;
|
|
|
|
#define BUF_NUM 1024
|
|
|
|
+PubSubClient::PubSubClient() {}
|
|
+
|
|
PubSubClient::PubSubClient(std::shared_ptr<Channel> channel) : stub_(SubManager::NewStub(channel))
|
|
{
|
|
uuid_t uuid;
|
|
@@ -38,6 +40,16 @@ PubSubClient::PubSubClient(std::shared_ptr<Channel> channel) : stub_(SubManager:
|
|
uuid_str = std::string(uuid_temp);
|
|
}
|
|
|
|
+void PubSubClient::init(std::shared_ptr<Channel> channel)
|
|
+{
|
|
+ uuid_t uuid;
|
|
+ char uuid_temp[37];
|
|
+ uuid_generate(uuid);
|
|
+ uuid_unparse(uuid, uuid_temp);
|
|
+ uuid_str = std::string(uuid_temp);
|
|
+ stub_ = SubManager::NewStub(channel);
|
|
+}
|
|
+
|
|
std::unique_ptr<ClientReader<Message>> PubSubClient::Subscribe(const int topic)
|
|
{
|
|
SubscribeRequest request;
|
|
diff --git a/observer_agent/grpc_comm/grpc_api.h b/observer_agent/grpc_comm/grpc_api.h
|
|
index 44d4aa9..4bde109 100644
|
|
--- a/observer_agent/grpc_comm/grpc_api.h
|
|
+++ b/observer_agent/grpc_comm/grpc_api.h
|
|
@@ -42,7 +42,9 @@ void RunServer();
|
|
class PubSubClient
|
|
{
|
|
public:
|
|
+ PubSubClient();
|
|
PubSubClient(std::shared_ptr<Channel> channel);
|
|
+ void init(std::shared_ptr<Channel> channel);
|
|
std::unique_ptr<ClientReader<Message>> Subscribe(const int topic);
|
|
void Publish(const int topic, const std::string &content);
|
|
void UnSubscribe(const int topic);
|
|
--
|
|
2.33.0
|
|
|