266 lines
10 KiB
Diff
266 lines
10 KiB
Diff
|
|
From 8dd0f6984ef002e30f3d7aa133a1a439fc5d0f95 Mon Sep 17 00:00:00 2001
|
||
|
|
From: chenjingwen <lhchenjw@gmail.com>
|
||
|
|
Date: Thu, 14 Dec 2023 21:55:10 +0800
|
||
|
|
Subject: [PATCH] grpc: fix coredump in Publish
|
||
|
|
|
||
|
|
fix coredump in Publish
|
||
|
|
|
||
|
|
Signed-off-by: chenjingwen <lhchenjw@gmail.com>
|
||
|
|
---
|
||
|
|
observer_agent/grpc_comm/server.cpp | 165 +++++++++++-----------------
|
||
|
|
1 file changed, 62 insertions(+), 103 deletions(-)
|
||
|
|
|
||
|
|
diff --git a/observer_agent/grpc_comm/server.cpp b/observer_agent/grpc_comm/server.cpp
|
||
|
|
index 938d09c..b858853 100644
|
||
|
|
--- a/observer_agent/grpc_comm/server.cpp
|
||
|
|
+++ b/observer_agent/grpc_comm/server.cpp
|
||
|
|
@@ -33,16 +33,21 @@ using grpc::ServerWriter;
|
||
|
|
|
||
|
|
static bool killed = false;
|
||
|
|
|
||
|
|
+class Subscribers {
|
||
|
|
+public:
|
||
|
|
+ int topic;
|
||
|
|
+ ServerWriter<Message> *writer;
|
||
|
|
+
|
||
|
|
+ Subscribers(int t, ServerWriter<Message> *w) : topic(t), writer(w) {}
|
||
|
|
+ Subscribers() : topic(0), writer(nullptr) {}
|
||
|
|
+};
|
||
|
|
+
|
||
|
|
class PubSubServiceImpl final : public SubManager::Service
|
||
|
|
{
|
||
|
|
public:
|
||
|
|
void CloseAllConnection(void)
|
||
|
|
{
|
||
|
|
- std::lock_guard<std::mutex> lk(wait_mutex);
|
||
|
|
-
|
||
|
|
- for (int i = 0; i < MAX_CONNECTION; i++) {
|
||
|
|
- connect_status[i] = false;
|
||
|
|
- }
|
||
|
|
+ std::lock_guard<std::mutex> lk(wait_mutex);
|
||
|
|
|
||
|
|
killed = true;
|
||
|
|
cv.notify_all();
|
||
|
|
@@ -55,50 +60,21 @@ class PubSubServiceImpl final : public SubManager::Service
|
||
|
|
std::string cli_name = request->sub_name();
|
||
|
|
Message msg;
|
||
|
|
Message keepalive_msg;
|
||
|
|
- int i = 0, tmp_index;
|
||
|
|
|
||
|
|
+ sub_mutex.lock();
|
||
|
|
if (connection_num >= MAX_CONNECTION) {
|
||
|
|
msg.set_text("over max connection number!");
|
||
|
|
- if (!writer->Write(msg))
|
||
|
|
- {
|
||
|
|
- std::cerr << "Failed to write the initial message" << std::endl;
|
||
|
|
- return grpc::Status(grpc::StatusCode::INTERNAL, "Failed to write the message");
|
||
|
|
- }
|
||
|
|
- return grpc::Status(grpc::StatusCode::INTERNAL, "over max connection number, Failed to Subscribe the topic");
|
||
|
|
- }
|
||
|
|
-
|
||
|
|
- for (auto iter = suber_topic_[cli_name].begin(); iter != suber_topic_[cli_name].end(); iter++)
|
||
|
|
- {
|
||
|
|
- if ((*iter & cli_topic) != 0)
|
||
|
|
- {
|
||
|
|
- msg.set_text("this client name already subscribe the topic");
|
||
|
|
- if (!writer->Write(msg))
|
||
|
|
- {
|
||
|
|
- std::cerr << "Failed to write the initial message" << std::endl;
|
||
|
|
- return grpc::Status(grpc::StatusCode::INTERNAL, "Failed to write the message");
|
||
|
|
- }
|
||
|
|
- return grpc::Status(grpc::StatusCode::INTERNAL, "this client name already subscribe the topic");
|
||
|
|
- }
|
||
|
|
- }
|
||
|
|
-
|
||
|
|
- sub_mutex.lock();
|
||
|
|
-
|
||
|
|
- for (tmp_index = 0; tmp_index < MAX_CONNECTION; tmp_index++)
|
||
|
|
- {
|
||
|
|
- if (!connect_status[tmp_index])
|
||
|
|
- break;
|
||
|
|
+ writer->Write(msg);
|
||
|
|
+ sub_mutex.unlock();
|
||
|
|
+ return grpc::Status(grpc::StatusCode::INTERNAL, "over max connection number");
|
||
|
|
}
|
||
|
|
|
||
|
|
- if (tmp_index == MAX_CONNECTION)
|
||
|
|
- {
|
||
|
|
+ auto iter = suber_topic_.find(cli_name);
|
||
|
|
+ if (iter != suber_topic_.end()) {
|
||
|
|
+ msg.set_text("this client name already subscribe the topic");
|
||
|
|
+ writer->Write(msg);
|
||
|
|
sub_mutex.unlock();
|
||
|
|
- msg.set_text("multi-process max connection number!");
|
||
|
|
- if (!writer->Write(msg))
|
||
|
|
- {
|
||
|
|
- std::cerr << "Failed to write the initial message" << std::endl;
|
||
|
|
- return grpc::Status(grpc::StatusCode::INTERNAL, "Failed to write the message");
|
||
|
|
- }
|
||
|
|
- return grpc::Status(grpc::StatusCode::INTERNAL, "multi-process max connection number, Failed to Subscribe the topic");
|
||
|
|
+ return grpc::Status(grpc::StatusCode::INTERNAL, "this client name already subscribe the topic");
|
||
|
|
}
|
||
|
|
|
||
|
|
msg.set_text("topic: " + std::to_string(cli_topic) + " Subscribe success!");
|
||
|
|
@@ -109,65 +85,50 @@ class PubSubServiceImpl final : public SubManager::Service
|
||
|
|
return grpc::Status(grpc::StatusCode::INTERNAL, "Failed to write the message");
|
||
|
|
}
|
||
|
|
|
||
|
|
- suber_topic_[cli_name].push_back(cli_topic);
|
||
|
|
- suber_writer_[cli_name].push_back(writer);
|
||
|
|
- suber_connection_[cli_name].push_back(tmp_index);
|
||
|
|
- connect_status[tmp_index] = true;
|
||
|
|
+ std::cout << "Subscribe " << cli_name << " ok" << std::endl;
|
||
|
|
+ suber_topic_[cli_name] = Subscribers(cli_topic, writer);
|
||
|
|
connection_num++;
|
||
|
|
-
|
||
|
|
sub_mutex.unlock();
|
||
|
|
|
||
|
|
- keepalive_msg.set_text("keepalive");
|
||
|
|
- while (connect_status[tmp_index])
|
||
|
|
+ /* loop until connot write */
|
||
|
|
+ while (!killed)
|
||
|
|
{
|
||
|
|
- if (!writer->Write(keepalive_msg))
|
||
|
|
- {
|
||
|
|
- for (auto topic_item : suber_topic_[cli_name])
|
||
|
|
- {
|
||
|
|
- if (topic_item == cli_topic)
|
||
|
|
- {
|
||
|
|
- sub_mutex.lock();
|
||
|
|
- suber_topic_[cli_name].erase(suber_topic_[cli_name].begin() + i);
|
||
|
|
- suber_writer_[cli_name].erase(suber_writer_[cli_name].begin() + i);
|
||
|
|
- connect_status[suber_connection_[cli_name].at(i)] = false;
|
||
|
|
- suber_connection_[cli_name].erase(suber_connection_[cli_name].begin() + i);
|
||
|
|
- connection_num--;
|
||
|
|
- sub_mutex.unlock();
|
||
|
|
- break;
|
||
|
|
- }
|
||
|
|
- i++;
|
||
|
|
- }
|
||
|
|
+ sub_mutex.lock();
|
||
|
|
+ if (suber_topic_.count(cli_name) == 0) {
|
||
|
|
+ sub_mutex.unlock();
|
||
|
|
+ return grpc::Status::OK;
|
||
|
|
+ }
|
||
|
|
+
|
||
|
|
+ keepalive_msg.set_text("keepalive");
|
||
|
|
+ if (!writer->Write(keepalive_msg)) {
|
||
|
|
+ DeleteSubscriberByCliName(cli_name);
|
||
|
|
+ sub_mutex.unlock();
|
||
|
|
return grpc::Status(grpc::StatusCode::INTERNAL, "writer is lose!");
|
||
|
|
}
|
||
|
|
+ sub_mutex.unlock();
|
||
|
|
WaitKeeplive();
|
||
|
|
}
|
||
|
|
+
|
||
|
|
+ std::cout << cli_name << " is dead" << std::endl;
|
||
|
|
return grpc::Status::OK;
|
||
|
|
}
|
||
|
|
|
||
|
|
grpc::Status Publish(ServerContext *context, const PublishRequest *request, Message *response) override
|
||
|
|
{
|
||
|
|
+ std::lock_guard<std::mutex> lock(sub_mutex);
|
||
|
|
int cli_topic = request->topic();
|
||
|
|
std::string cli_data = request->data();
|
||
|
|
- int i = 0;
|
||
|
|
Message msg;
|
||
|
|
msg.set_text(cli_data);
|
||
|
|
|
||
|
|
for (auto iter = suber_topic_.begin(); iter != suber_topic_.end(); iter++)
|
||
|
|
{
|
||
|
|
- i = 0;
|
||
|
|
- for (auto topic_item : iter->second)
|
||
|
|
- {
|
||
|
|
- if ((topic_item & cli_topic) != 0)
|
||
|
|
- {
|
||
|
|
- auto &subscriber = suber_writer_[iter->first][i];
|
||
|
|
- if (!subscriber->Write(msg))
|
||
|
|
- {
|
||
|
|
- std::cerr << "Failed to write to a subscriber" << std::endl;
|
||
|
|
- return grpc::Status(grpc::StatusCode::INTERNAL, "Failed to write the message");
|
||
|
|
- }
|
||
|
|
- break;
|
||
|
|
+ Subscribers subscriber = iter->second;
|
||
|
|
+ if ((subscriber.topic & cli_topic) != 0) {
|
||
|
|
+ if (!subscriber.writer->Write(msg)) {
|
||
|
|
+ std::cerr << "Failed to write to a subscriber: " << iter->first << std::endl;
|
||
|
|
+ return grpc::Status(grpc::StatusCode::INTERNAL, "Failed to write the message");
|
||
|
|
}
|
||
|
|
- i++;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
@@ -177,8 +138,7 @@ class PubSubServiceImpl final : public SubManager::Service
|
||
|
|
grpc::Status UnSubscribe(ServerContext *context, const UnSubscribeRequest *request, Message *response) override
|
||
|
|
{
|
||
|
|
std::string cli_name = request->sub_name();
|
||
|
|
- int i = 0;
|
||
|
|
- int unsub_flag = 0;
|
||
|
|
+ std::lock_guard<std::mutex> lock(sub_mutex);
|
||
|
|
|
||
|
|
if (connection_num <= 0) {
|
||
|
|
response->set_text("connection_num <= 0, don't UnSubscribe!");
|
||
|
|
@@ -186,20 +146,7 @@ class PubSubServiceImpl final : public SubManager::Service
|
||
|
|
return grpc::Status(grpc::StatusCode::INTERNAL, "connection_num <= 0, Failed to UnSubscribe topic!");
|
||
|
|
}
|
||
|
|
|
||
|
|
- std::lock_guard<std::mutex> lock(sub_mutex);
|
||
|
|
-
|
||
|
|
- std::unordered_map<std::string, std::vector<int>>::iterator iter = suber_topic_.find(cli_name);
|
||
|
|
- if (iter != suber_topic_.end())
|
||
|
|
- {
|
||
|
|
- suber_topic_[cli_name].erase(suber_topic_[cli_name].begin() + i);
|
||
|
|
- suber_writer_[cli_name].erase(suber_writer_[cli_name].begin() + i);
|
||
|
|
- connect_status[suber_connection_[cli_name].at(i)] = false;
|
||
|
|
- suber_connection_[cli_name].erase(suber_connection_[cli_name].begin() + i);
|
||
|
|
- connection_num--;
|
||
|
|
- unsub_flag = 1;
|
||
|
|
- }
|
||
|
|
-
|
||
|
|
- if (!unsub_flag)
|
||
|
|
+ if (!DeleteSubscriberByCliName(cli_name))
|
||
|
|
{
|
||
|
|
response->set_text("don't exist the reader");
|
||
|
|
return grpc::Status(grpc::StatusCode::INTERNAL, "Failed to UnSubscribe reader!");
|
||
|
|
@@ -209,19 +156,31 @@ class PubSubServiceImpl final : public SubManager::Service
|
||
|
|
}
|
||
|
|
|
||
|
|
private:
|
||
|
|
- std::unordered_map<std::string, std::vector<int>> suber_topic_;
|
||
|
|
- std::unordered_map<std::string, std::vector<ServerWriter<Message> *>> suber_writer_;
|
||
|
|
- std::unordered_map<std::string, std::vector<int>> suber_connection_;
|
||
|
|
+ std::unordered_map<std::string, Subscribers> suber_topic_;
|
||
|
|
std::mutex sub_mutex;
|
||
|
|
std::mutex wait_mutex;
|
||
|
|
std::condition_variable cv;
|
||
|
|
int connection_num = 0;
|
||
|
|
- bool connect_status[MAX_CONNECTION] = {false};
|
||
|
|
|
||
|
|
void WaitKeeplive(void)
|
||
|
|
{
|
||
|
|
- std::unique_lock<std::mutex> lk(wait_mutex);
|
||
|
|
- cv.wait_for(lk, std::chrono::seconds(CHECK_TIME), []{ return killed; });
|
||
|
|
+ std::unique_lock<std::mutex> lk(wait_mutex);
|
||
|
|
+ cv.wait_for(lk, std::chrono::seconds(CHECK_TIME), []{ return killed; });
|
||
|
|
+ }
|
||
|
|
+
|
||
|
|
+ /* Must called with sub_mutex */
|
||
|
|
+ bool DeleteSubscriberByCliName(std::string &cli_name)
|
||
|
|
+ {
|
||
|
|
+ bool exist = false;
|
||
|
|
+ std::cout << "UnSubscribe " << cli_name << " ok" << std::endl;
|
||
|
|
+
|
||
|
|
+ auto it = suber_topic_.find(cli_name);
|
||
|
|
+ if (it != suber_topic_.end()) {
|
||
|
|
+ suber_topic_.erase(it);
|
||
|
|
+ connection_num--;
|
||
|
|
+ exist = true;
|
||
|
|
+ }
|
||
|
|
+ return exist;
|
||
|
|
}
|
||
|
|
};
|
||
|
|
|
||
|
|
--
|
||
|
|
2.33.0
|
||
|
|
|