backport patches to fix issues such as grpc hangs. Signed-off-by: chenjingwen <lhchenjw@gmail.com>
108 lines
3.5 KiB
Diff
108 lines
3.5 KiB
Diff
From f531f56ee36aecd3bb9eae527551eb8eff8c9457 Mon Sep 17 00:00:00 2001
|
|
From: chenjingwen <lhchenjw@gmail.com>
|
|
Date: Mon, 11 Dec 2023 19:52:42 +0800
|
|
Subject: [PATCH 2/2] secDetectord: fix a grpc hang bug
|
|
|
|
break connection loop before shutdown
|
|
so that shutdown won't hang.
|
|
|
|
Signed-off-by: chenjingwen <lhchenjw@gmail.com>
|
|
---
|
|
observer_agent/grpc_comm/grpc_api.h | 1 +
|
|
observer_agent/grpc_comm/server.cpp | 30 ++++++++++++++++++++++++++---
|
|
2 files changed, 28 insertions(+), 3 deletions(-)
|
|
|
|
diff --git a/observer_agent/grpc_comm/grpc_api.h b/observer_agent/grpc_comm/grpc_api.h
|
|
index 4bde109..27a9139 100644
|
|
--- a/observer_agent/grpc_comm/grpc_api.h
|
|
+++ b/observer_agent/grpc_comm/grpc_api.h
|
|
@@ -26,6 +26,7 @@ class PubSubServiceImpl final : public SubManager::Service
|
|
grpc::Status Subscribe(ServerContext *context, const SubscribeRequest *request, ServerWriter<Message> *writer);
|
|
grpc::Status Publish(ServerContext *context, const PublishRequest *request, Message *response);
|
|
grpc::Status UnSubscribe(ServerContext *context, const UnSubscribeRequest *request, Message *response);
|
|
+ void CloseAllConnection(void);
|
|
|
|
private:
|
|
std::unordered_map<std::string, std::vector<int>> suber_topic_;
|
|
diff --git a/observer_agent/grpc_comm/server.cpp b/observer_agent/grpc_comm/server.cpp
|
|
index cce5131..b47b1aa 100644
|
|
--- a/observer_agent/grpc_comm/server.cpp
|
|
+++ b/observer_agent/grpc_comm/server.cpp
|
|
@@ -16,6 +16,7 @@
|
|
#include "comm_api.grpc.pb.h"
|
|
#include <grpcpp/grpcpp.h>
|
|
#include <sys/stat.h>
|
|
+#include <condition_variable>
|
|
|
|
using data_comm::Message;
|
|
using data_comm::PublishRequest;
|
|
@@ -30,9 +31,23 @@ using grpc::ServerWriter;
|
|
#define MAX_CONNECTION 5
|
|
#define CHECK_TIME 60
|
|
|
|
+static bool killed = false;
|
|
+
|
|
class PubSubServiceImpl final : public SubManager::Service
|
|
{
|
|
public:
|
|
+ void CloseAllConnection(void)
|
|
+ {
|
|
+ std::lock_guard<std::mutex> lk(wait_mutex);
|
|
+
|
|
+ for (int i = 0; i < MAX_CONNECTION; i++) {
|
|
+ connect_status[i] = false;
|
|
+ }
|
|
+
|
|
+ killed = true;
|
|
+ cv.notify_all();
|
|
+ }
|
|
+
|
|
grpc::Status Subscribe(ServerContext *context, const SubscribeRequest *request,
|
|
ServerWriter<Message> *writer) override
|
|
{
|
|
@@ -124,7 +139,7 @@ class PubSubServiceImpl final : public SubManager::Service
|
|
}
|
|
return grpc::Status(grpc::StatusCode::INTERNAL, "writer is lose!");
|
|
}
|
|
- sleep(CHECK_TIME);
|
|
+ WaitKeeplive();
|
|
}
|
|
return grpc::Status::OK;
|
|
}
|
|
@@ -203,21 +218,30 @@ class PubSubServiceImpl final : public SubManager::Service
|
|
std::unordered_map<std::string, std::vector<ServerWriter<Message> *>> suber_writer_;
|
|
std::unordered_map<std::string, std::vector<int>> suber_connection_;
|
|
std::mutex sub_mutex;
|
|
+ std::mutex wait_mutex;
|
|
+ std::condition_variable cv;
|
|
int connection_num = 0;
|
|
bool connect_status[MAX_CONNECTION] = {false};
|
|
+
|
|
+ void WaitKeeplive(void)
|
|
+ {
|
|
+ std::unique_lock<std::mutex> lk(wait_mutex);
|
|
+ cv.wait_for(lk, std::chrono::seconds(CHECK_TIME), []{ return killed; });
|
|
+ }
|
|
};
|
|
|
|
-std::unique_ptr<Server> server;
|
|
+static std::unique_ptr<Server> server;
|
|
+static PubSubServiceImpl service;
|
|
|
|
void StopServer()
|
|
{
|
|
+ service.CloseAllConnection();
|
|
server->Shutdown();
|
|
}
|
|
|
|
void RunServer()
|
|
{
|
|
std::string server_address("unix:///var/run/secDetector.sock");
|
|
- PubSubServiceImpl service;
|
|
|
|
ServerBuilder builder;
|
|
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
|
|
--
|
|
2.33.0
|
|
|