secDetector/Backport-add-handle-cleanup-and-refactor-Subscribe-UnSubscrib.patch
hurricane618 d224ac9cae backport some fix patchs
fix some bugs

Signed-off-by: hurricane618 <hurricane618@hotmail.com>
2023-12-05 09:43:14 +08:00

167 lines
6.4 KiB
Diff

From aca8d49dea577cac55a6b9704770882f18649fb9 Mon Sep 17 00:00:00 2001
From: hurricane618 <hurricane618@hotmail.com>
Date: Fri, 1 Dec 2023 01:26:18 +0800
Subject: [PATCH 3/6] add handle cleanup and refactor Subscribe/UnSubscribe
1. fix error residue handle
2. refactor Subscribe and UnSubscribe
Signed-off-by: hurricane618 <hurricane618@hotmail.com>
---
observer_agent/grpc_comm/Makefile | 8 +----
observer_agent/grpc_comm/grpc_api.h | 9 ++++-
observer_agent/grpc_comm/server.cpp | 51 +++++++++++++++++++++++++++--
3 files changed, 58 insertions(+), 10 deletions(-)
diff --git a/observer_agent/grpc_comm/Makefile b/observer_agent/grpc_comm/Makefile
index 4dbaa46..3c87ad8 100644
--- a/observer_agent/grpc_comm/Makefile
+++ b/observer_agent/grpc_comm/Makefile
@@ -38,13 +38,7 @@ PROTOS_PATH = ./protos
vpath %.proto $(PROTOS_PATH)
-all: system-check client server client_pub_demo client_sub_demo server_demo
-
-client: comm_api.pb.o comm_api.grpc.pb.o client.o
- @echo "only compile client don't link"
-
-server: comm_api.pb.o comm_api.grpc.pb.o server.o
- @echo "only compile server don't link"
+all: system-check client_pub_demo client_sub_demo server_demo
client_pub_demo: comm_api.pb.o comm_api.grpc.pb.o client.o client_pub_demo.o
$(CXX) $^ $(LDFLAGS) -o $@
diff --git a/observer_agent/grpc_comm/grpc_api.h b/observer_agent/grpc_comm/grpc_api.h
index 7b19552..44d4aa9 100644
--- a/observer_agent/grpc_comm/grpc_api.h
+++ b/observer_agent/grpc_comm/grpc_api.h
@@ -18,6 +18,8 @@ using grpc::ServerBuilder;
using grpc::ServerContext;
using grpc::ServerWriter;
+#define MAX_CONNECTION 5
+
class PubSubServiceImpl final : public SubManager::Service
{
public:
@@ -26,7 +28,12 @@ class PubSubServiceImpl final : public SubManager::Service
grpc::Status UnSubscribe(ServerContext *context, const UnSubscribeRequest *request, Message *response);
private:
- std::unordered_map<int, std::vector<ServerWriter<Message> *>> subscribers_;
+ std::unordered_map<std::string, std::vector<int>> suber_topic_;
+ std::unordered_map<std::string, std::vector<ServerWriter<Message> *>> suber_writer_;
+ std::unordered_map<std::string, std::vector<int>> suber_connection_;
+ std::mutex sub_mutex;
+ int connection_num = 0;
+ bool connect_status[MAX_CONNECTION] = {false};
};
void StopServer();
diff --git a/observer_agent/grpc_comm/server.cpp b/observer_agent/grpc_comm/server.cpp
index 04e8163..54ae66f 100644
--- a/observer_agent/grpc_comm/server.cpp
+++ b/observer_agent/grpc_comm/server.cpp
@@ -28,6 +28,7 @@ using grpc::ServerContext;
using grpc::ServerWriter;
#define MAX_CONNECTION 5
+#define CHECK_TIME 60
class PubSubServiceImpl final : public SubManager::Service
{
@@ -38,6 +39,8 @@ class PubSubServiceImpl final : public SubManager::Service
int cli_topic = request->topic();
std::string cli_name = request->sub_name();
Message msg;
+ Message keepalive_msg;
+ int i = 0, tmp_index;
if (connection_num >= MAX_CONNECTION) {
msg.set_text("over max connection number!");
@@ -65,8 +68,28 @@ class PubSubServiceImpl final : public SubManager::Service
sub_mutex.lock();
+ for (tmp_index = 0; tmp_index < MAX_CONNECTION; tmp_index++)
+ {
+ if (!connect_status[tmp_index])
+ break;
+ }
+
+ if (tmp_index == MAX_CONNECTION)
+ {
+ sub_mutex.unlock();
+ msg.set_text("multi-process max connection number!");
+ if (!writer->Write(msg))
+ {
+ std::cerr << "Failed to write the initial message" << std::endl;
+ return grpc::Status(grpc::StatusCode::INTERNAL, "Failed to write the message");
+ }
+ return grpc::Status(grpc::StatusCode::INTERNAL, "multi-process max connection number, Failed to Subscribe the topic");
+ }
+
suber_topic_[cli_name].push_back(cli_topic);
suber_writer_[cli_name].push_back(writer);
+ suber_connection_[cli_name].push_back(tmp_index);
+ connect_status[tmp_index] = true;
connection_num++;
sub_mutex.unlock();
@@ -78,9 +101,29 @@ class PubSubServiceImpl final : public SubManager::Service
return grpc::Status(grpc::StatusCode::INTERNAL, "Failed to write the message");
}
- // ToDo: set some condition to break loop
- while (1)
+ keepalive_msg.set_text("keepalive");
+ while (connect_status[tmp_index])
{
+ sleep(CHECK_TIME);
+ if (!writer->Write(keepalive_msg))
+ {
+ for (auto topic_item : suber_topic_[cli_name])
+ {
+ if (topic_item == cli_topic)
+ {
+ sub_mutex.lock();
+ suber_topic_[cli_name].erase(suber_topic_[cli_name].begin() + i);
+ suber_writer_[cli_name].erase(suber_writer_[cli_name].begin() + i);
+ connect_status[suber_connection_[cli_name].at(i)] = false;
+ suber_connection_[cli_name].erase(suber_connection_[cli_name].begin() + i);
+ connection_num--;
+ sub_mutex.unlock();
+ break;
+ }
+ i++;
+ }
+ return grpc::Status(grpc::StatusCode::INTERNAL, "writer is lose!");
+ }
}
return grpc::Status::OK;
}
@@ -136,6 +179,8 @@ class PubSubServiceImpl final : public SubManager::Service
{
suber_topic_[cli_name].erase(suber_topic_[cli_name].begin() + i);
suber_writer_[cli_name].erase(suber_writer_[cli_name].begin() + i);
+ connect_status[suber_connection_[cli_name].at(i)] = false;
+ suber_connection_[cli_name].erase(suber_connection_[cli_name].begin() + i);
connection_num--;
unsub_flag = 1;
break;
@@ -155,8 +200,10 @@ class PubSubServiceImpl final : public SubManager::Service
private:
std::unordered_map<std::string, std::vector<int>> suber_topic_;
std::unordered_map<std::string, std::vector<ServerWriter<Message> *>> suber_writer_;
+ std::unordered_map<std::string, std::vector<int>> suber_connection_;
std::mutex sub_mutex;
int connection_num = 0;
+ bool connect_status[MAX_CONNECTION] = {false};
};
std::unique_ptr<Server> server;
--
2.33.0