167 lines
6.4 KiB
Diff
167 lines
6.4 KiB
Diff
From aca8d49dea577cac55a6b9704770882f18649fb9 Mon Sep 17 00:00:00 2001
|
|
From: hurricane618 <hurricane618@hotmail.com>
|
|
Date: Fri, 1 Dec 2023 01:26:18 +0800
|
|
Subject: [PATCH 3/6] add handle cleanup and refactor Subscribe/UnSubscribe
|
|
|
|
1. fix error residue handle
|
|
2. refactor Subscribe and UnSubscribe
|
|
|
|
Signed-off-by: hurricane618 <hurricane618@hotmail.com>
|
|
---
|
|
observer_agent/grpc_comm/Makefile | 8 +----
|
|
observer_agent/grpc_comm/grpc_api.h | 9 ++++-
|
|
observer_agent/grpc_comm/server.cpp | 51 +++++++++++++++++++++++++++--
|
|
3 files changed, 58 insertions(+), 10 deletions(-)
|
|
|
|
diff --git a/observer_agent/grpc_comm/Makefile b/observer_agent/grpc_comm/Makefile
|
|
index 4dbaa46..3c87ad8 100644
|
|
--- a/observer_agent/grpc_comm/Makefile
|
|
+++ b/observer_agent/grpc_comm/Makefile
|
|
@@ -38,13 +38,7 @@ PROTOS_PATH = ./protos
|
|
|
|
vpath %.proto $(PROTOS_PATH)
|
|
|
|
-all: system-check client server client_pub_demo client_sub_demo server_demo
|
|
-
|
|
-client: comm_api.pb.o comm_api.grpc.pb.o client.o
|
|
- @echo "only compile client don't link"
|
|
-
|
|
-server: comm_api.pb.o comm_api.grpc.pb.o server.o
|
|
- @echo "only compile server don't link"
|
|
+all: system-check client_pub_demo client_sub_demo server_demo
|
|
|
|
client_pub_demo: comm_api.pb.o comm_api.grpc.pb.o client.o client_pub_demo.o
|
|
$(CXX) $^ $(LDFLAGS) -o $@
|
|
diff --git a/observer_agent/grpc_comm/grpc_api.h b/observer_agent/grpc_comm/grpc_api.h
|
|
index 7b19552..44d4aa9 100644
|
|
--- a/observer_agent/grpc_comm/grpc_api.h
|
|
+++ b/observer_agent/grpc_comm/grpc_api.h
|
|
@@ -18,6 +18,8 @@ using grpc::ServerBuilder;
|
|
using grpc::ServerContext;
|
|
using grpc::ServerWriter;
|
|
|
|
+#define MAX_CONNECTION 5
|
|
+
|
|
class PubSubServiceImpl final : public SubManager::Service
|
|
{
|
|
public:
|
|
@@ -26,7 +28,12 @@ class PubSubServiceImpl final : public SubManager::Service
|
|
grpc::Status UnSubscribe(ServerContext *context, const UnSubscribeRequest *request, Message *response);
|
|
|
|
private:
|
|
- std::unordered_map<int, std::vector<ServerWriter<Message> *>> subscribers_;
|
|
+ std::unordered_map<std::string, std::vector<int>> suber_topic_;
|
|
+ std::unordered_map<std::string, std::vector<ServerWriter<Message> *>> suber_writer_;
|
|
+ std::unordered_map<std::string, std::vector<int>> suber_connection_;
|
|
+ std::mutex sub_mutex;
|
|
+ int connection_num = 0;
|
|
+ bool connect_status[MAX_CONNECTION] = {false};
|
|
};
|
|
|
|
void StopServer();
|
|
diff --git a/observer_agent/grpc_comm/server.cpp b/observer_agent/grpc_comm/server.cpp
|
|
index 04e8163..54ae66f 100644
|
|
--- a/observer_agent/grpc_comm/server.cpp
|
|
+++ b/observer_agent/grpc_comm/server.cpp
|
|
@@ -28,6 +28,7 @@ using grpc::ServerContext;
|
|
using grpc::ServerWriter;
|
|
|
|
#define MAX_CONNECTION 5
|
|
+#define CHECK_TIME 60
|
|
|
|
class PubSubServiceImpl final : public SubManager::Service
|
|
{
|
|
@@ -38,6 +39,8 @@ class PubSubServiceImpl final : public SubManager::Service
|
|
int cli_topic = request->topic();
|
|
std::string cli_name = request->sub_name();
|
|
Message msg;
|
|
+ Message keepalive_msg;
|
|
+ int i = 0, tmp_index;
|
|
|
|
if (connection_num >= MAX_CONNECTION) {
|
|
msg.set_text("over max connection number!");
|
|
@@ -65,8 +68,28 @@ class PubSubServiceImpl final : public SubManager::Service
|
|
|
|
sub_mutex.lock();
|
|
|
|
+ for (tmp_index = 0; tmp_index < MAX_CONNECTION; tmp_index++)
|
|
+ {
|
|
+ if (!connect_status[tmp_index])
|
|
+ break;
|
|
+ }
|
|
+
|
|
+ if (tmp_index == MAX_CONNECTION)
|
|
+ {
|
|
+ sub_mutex.unlock();
|
|
+ msg.set_text("multi-process max connection number!");
|
|
+ if (!writer->Write(msg))
|
|
+ {
|
|
+ std::cerr << "Failed to write the initial message" << std::endl;
|
|
+ return grpc::Status(grpc::StatusCode::INTERNAL, "Failed to write the message");
|
|
+ }
|
|
+ return grpc::Status(grpc::StatusCode::INTERNAL, "multi-process max connection number, Failed to Subscribe the topic");
|
|
+ }
|
|
+
|
|
suber_topic_[cli_name].push_back(cli_topic);
|
|
suber_writer_[cli_name].push_back(writer);
|
|
+ suber_connection_[cli_name].push_back(tmp_index);
|
|
+ connect_status[tmp_index] = true;
|
|
connection_num++;
|
|
|
|
sub_mutex.unlock();
|
|
@@ -78,9 +101,29 @@ class PubSubServiceImpl final : public SubManager::Service
|
|
return grpc::Status(grpc::StatusCode::INTERNAL, "Failed to write the message");
|
|
}
|
|
|
|
- // ToDo: set some condition to break loop
|
|
- while (1)
|
|
+ keepalive_msg.set_text("keepalive");
|
|
+ while (connect_status[tmp_index])
|
|
{
|
|
+ sleep(CHECK_TIME);
|
|
+ if (!writer->Write(keepalive_msg))
|
|
+ {
|
|
+ for (auto topic_item : suber_topic_[cli_name])
|
|
+ {
|
|
+ if (topic_item == cli_topic)
|
|
+ {
|
|
+ sub_mutex.lock();
|
|
+ suber_topic_[cli_name].erase(suber_topic_[cli_name].begin() + i);
|
|
+ suber_writer_[cli_name].erase(suber_writer_[cli_name].begin() + i);
|
|
+ connect_status[suber_connection_[cli_name].at(i)] = false;
|
|
+ suber_connection_[cli_name].erase(suber_connection_[cli_name].begin() + i);
|
|
+ connection_num--;
|
|
+ sub_mutex.unlock();
|
|
+ break;
|
|
+ }
|
|
+ i++;
|
|
+ }
|
|
+ return grpc::Status(grpc::StatusCode::INTERNAL, "writer is lose!");
|
|
+ }
|
|
}
|
|
return grpc::Status::OK;
|
|
}
|
|
@@ -136,6 +179,8 @@ class PubSubServiceImpl final : public SubManager::Service
|
|
{
|
|
suber_topic_[cli_name].erase(suber_topic_[cli_name].begin() + i);
|
|
suber_writer_[cli_name].erase(suber_writer_[cli_name].begin() + i);
|
|
+ connect_status[suber_connection_[cli_name].at(i)] = false;
|
|
+ suber_connection_[cli_name].erase(suber_connection_[cli_name].begin() + i);
|
|
connection_num--;
|
|
unsub_flag = 1;
|
|
break;
|
|
@@ -155,8 +200,10 @@ class PubSubServiceImpl final : public SubManager::Service
|
|
private:
|
|
std::unordered_map<std::string, std::vector<int>> suber_topic_;
|
|
std::unordered_map<std::string, std::vector<ServerWriter<Message> *>> suber_writer_;
|
|
+ std::unordered_map<std::string, std::vector<int>> suber_connection_;
|
|
std::mutex sub_mutex;
|
|
int connection_num = 0;
|
|
+ bool connect_status[MAX_CONNECTION] = {false};
|
|
};
|
|
|
|
std::unique_ptr<Server> server;
|
|
--
|
|
2.33.0
|
|
|