From aca8d49dea577cac55a6b9704770882f18649fb9 Mon Sep 17 00:00:00 2001 From: hurricane618 Date: Fri, 1 Dec 2023 01:26:18 +0800 Subject: [PATCH 3/6] add handle cleanup and refactor Subscribe/UnSubscribe 1. fix error residue handle 2. refactor Subscribe and UnSubscribe Signed-off-by: hurricane618 --- observer_agent/grpc_comm/Makefile | 8 +---- observer_agent/grpc_comm/grpc_api.h | 9 ++++- observer_agent/grpc_comm/server.cpp | 51 +++++++++++++++++++++++++++-- 3 files changed, 58 insertions(+), 10 deletions(-) diff --git a/observer_agent/grpc_comm/Makefile b/observer_agent/grpc_comm/Makefile index 4dbaa46..3c87ad8 100644 --- a/observer_agent/grpc_comm/Makefile +++ b/observer_agent/grpc_comm/Makefile @@ -38,13 +38,7 @@ PROTOS_PATH = ./protos vpath %.proto $(PROTOS_PATH) -all: system-check client server client_pub_demo client_sub_demo server_demo - -client: comm_api.pb.o comm_api.grpc.pb.o client.o - @echo "only compile client don't link" - -server: comm_api.pb.o comm_api.grpc.pb.o server.o - @echo "only compile server don't link" +all: system-check client_pub_demo client_sub_demo server_demo client_pub_demo: comm_api.pb.o comm_api.grpc.pb.o client.o client_pub_demo.o $(CXX) $^ $(LDFLAGS) -o $@ diff --git a/observer_agent/grpc_comm/grpc_api.h b/observer_agent/grpc_comm/grpc_api.h index 7b19552..44d4aa9 100644 --- a/observer_agent/grpc_comm/grpc_api.h +++ b/observer_agent/grpc_comm/grpc_api.h @@ -18,6 +18,8 @@ using grpc::ServerBuilder; using grpc::ServerContext; using grpc::ServerWriter; +#define MAX_CONNECTION 5 + class PubSubServiceImpl final : public SubManager::Service { public: @@ -26,7 +28,12 @@ class PubSubServiceImpl final : public SubManager::Service grpc::Status UnSubscribe(ServerContext *context, const UnSubscribeRequest *request, Message *response); private: - std::unordered_map *>> subscribers_; + std::unordered_map> suber_topic_; + std::unordered_map *>> suber_writer_; + std::unordered_map> suber_connection_; + std::mutex sub_mutex; + int connection_num = 0; + bool connect_status[MAX_CONNECTION] = {false}; }; void StopServer(); diff --git a/observer_agent/grpc_comm/server.cpp b/observer_agent/grpc_comm/server.cpp index 04e8163..54ae66f 100644 --- a/observer_agent/grpc_comm/server.cpp +++ b/observer_agent/grpc_comm/server.cpp @@ -28,6 +28,7 @@ using grpc::ServerContext; using grpc::ServerWriter; #define MAX_CONNECTION 5 +#define CHECK_TIME 60 class PubSubServiceImpl final : public SubManager::Service { @@ -38,6 +39,8 @@ class PubSubServiceImpl final : public SubManager::Service int cli_topic = request->topic(); std::string cli_name = request->sub_name(); Message msg; + Message keepalive_msg; + int i = 0, tmp_index; if (connection_num >= MAX_CONNECTION) { msg.set_text("over max connection number!"); @@ -65,8 +68,28 @@ class PubSubServiceImpl final : public SubManager::Service sub_mutex.lock(); + for (tmp_index = 0; tmp_index < MAX_CONNECTION; tmp_index++) + { + if (!connect_status[tmp_index]) + break; + } + + if (tmp_index == MAX_CONNECTION) + { + sub_mutex.unlock(); + msg.set_text("multi-process max connection number!"); + if (!writer->Write(msg)) + { + std::cerr << "Failed to write the initial message" << std::endl; + return grpc::Status(grpc::StatusCode::INTERNAL, "Failed to write the message"); + } + return grpc::Status(grpc::StatusCode::INTERNAL, "multi-process max connection number, Failed to Subscribe the topic"); + } + suber_topic_[cli_name].push_back(cli_topic); suber_writer_[cli_name].push_back(writer); + suber_connection_[cli_name].push_back(tmp_index); + connect_status[tmp_index] = true; connection_num++; sub_mutex.unlock(); @@ -78,9 +101,29 @@ class PubSubServiceImpl final : public SubManager::Service return grpc::Status(grpc::StatusCode::INTERNAL, "Failed to write the message"); } - // ToDo: set some condition to break loop - while (1) + keepalive_msg.set_text("keepalive"); + while (connect_status[tmp_index]) { + sleep(CHECK_TIME); + if (!writer->Write(keepalive_msg)) + { + for (auto topic_item : suber_topic_[cli_name]) + { + if (topic_item == cli_topic) + { + sub_mutex.lock(); + suber_topic_[cli_name].erase(suber_topic_[cli_name].begin() + i); + suber_writer_[cli_name].erase(suber_writer_[cli_name].begin() + i); + connect_status[suber_connection_[cli_name].at(i)] = false; + suber_connection_[cli_name].erase(suber_connection_[cli_name].begin() + i); + connection_num--; + sub_mutex.unlock(); + break; + } + i++; + } + return grpc::Status(grpc::StatusCode::INTERNAL, "writer is lose!"); + } } return grpc::Status::OK; } @@ -136,6 +179,8 @@ class PubSubServiceImpl final : public SubManager::Service { suber_topic_[cli_name].erase(suber_topic_[cli_name].begin() + i); suber_writer_[cli_name].erase(suber_writer_[cli_name].begin() + i); + connect_status[suber_connection_[cli_name].at(i)] = false; + suber_connection_[cli_name].erase(suber_connection_[cli_name].begin() + i); connection_num--; unsub_flag = 1; break; @@ -155,8 +200,10 @@ class PubSubServiceImpl final : public SubManager::Service private: std::unordered_map> suber_topic_; std::unordered_map *>> suber_writer_; + std::unordered_map> suber_connection_; std::mutex sub_mutex; int connection_num = 0; + bool connect_status[MAX_CONNECTION] = {false}; }; std::unique_ptr server; -- 2.33.0