backport patches to fix issues such as grpc hangs. Signed-off-by: chenjingwen <lhchenjw@gmail.com>
190 lines
7.1 KiB
Diff
190 lines
7.1 KiB
Diff
From 4a58573122e2c677c427cb43bb02e6f055fdf391 Mon Sep 17 00:00:00 2001
|
|
From: zgzxx <zhangguangzhi3@huawei.com>
|
|
Date: Mon, 11 Dec 2023 20:57:58 +0800
|
|
Subject: [PATCH] secUnsub del topic
|
|
|
|
---
|
|
examples/python/client.py | 4 +--
|
|
lib/secDetector_sdk.cpp | 8 ++----
|
|
lib/secDetector_sdk.h | 2 +-
|
|
observer_agent/grpc_comm/client.cpp | 3 +--
|
|
observer_agent/grpc_comm/client_sub_demo.cpp | 2 +-
|
|
observer_agent/grpc_comm/grpc_api.h | 2 +-
|
|
.../grpc_comm/protos/comm_api.proto | 3 +--
|
|
observer_agent/grpc_comm/server.cpp | 27 ++++++++-----------
|
|
8 files changed, 20 insertions(+), 31 deletions(-)
|
|
|
|
diff --git a/examples/python/client.py b/examples/python/client.py
|
|
index 312384d..3fb95b4 100644
|
|
--- a/examples/python/client.py
|
|
+++ b/examples/python/client.py
|
|
@@ -31,7 +31,7 @@ g_cli_reader_lock = threading.Lock()
|
|
|
|
secDetectorsdklib.secSub.argtypes = [ctypes.c_int]
|
|
secDetectorsdklib.secSub.restype = ctypes.c_void_p
|
|
-secDetectorsdklib.secUnsub.argtypes = [ctypes.c_int, ctypes.c_void_p]
|
|
+secDetectorsdklib.secUnsub.argtypes = [ctypes.c_void_p]
|
|
secDetectorsdklib.secUnsub.restype = None
|
|
secDetectorsdklib.secReadFrom.argtypes = [ctypes.c_void_p, ctypes.c_char_p, ctypes.c_int]
|
|
secDetectorsdklib.secReadFrom.restype = None
|
|
@@ -66,7 +66,7 @@ def thread_func_unsub(num=0):
|
|
g_cli_reader_lock.acquire()
|
|
try:
|
|
g_read_flag = False
|
|
- secDetectorsdklib.secUnsub(1, g_cli_reader)
|
|
+ secDetectorsdklib.secUnsub(g_cli_reader)
|
|
finally:
|
|
g_cli_reader_lock.release()
|
|
print("client thread_func_unsub end")
|
|
diff --git a/lib/secDetector_sdk.cpp b/lib/secDetector_sdk.cpp
|
|
index 6f47f41..6b00953 100644
|
|
--- a/lib/secDetector_sdk.cpp
|
|
+++ b/lib/secDetector_sdk.cpp
|
|
@@ -62,13 +62,9 @@ void *secSub(const int topic)
|
|
return ret_reader;
|
|
}
|
|
|
|
-void secUnsub(const int topic, void *reader)
|
|
+void secUnsub(void *reader)
|
|
{
|
|
PubSubClient *cur_client;
|
|
- if (topic <= 0 || topic > ALLTOPIC) {
|
|
- printf("lib secUnsub failed, topic:%d is error\n", topic);
|
|
- return;
|
|
- }
|
|
|
|
if (!reader)
|
|
return;
|
|
@@ -77,7 +73,7 @@ void secUnsub(const int topic, void *reader)
|
|
Readmap::iterator iter = g_reader_map.find(reader);
|
|
if (iter != g_reader_map.end()) {
|
|
cur_client = iter->second.second;
|
|
- cur_client->UnSubscribe(topic);
|
|
+ cur_client->UnSubscribe();
|
|
g_reader_map.erase(iter);
|
|
reader = NULL;
|
|
delete cur_client;
|
|
diff --git a/lib/secDetector_sdk.h b/lib/secDetector_sdk.h
|
|
index abf112b..92ef5b4 100644
|
|
--- a/lib/secDetector_sdk.h
|
|
+++ b/lib/secDetector_sdk.h
|
|
@@ -18,7 +18,7 @@
|
|
#define SECDETECTOR_SDK_H
|
|
|
|
void *secSub(const int topic);
|
|
-void secUnsub(const int topic, void *reader);
|
|
+void secUnsub(void *reader);
|
|
void secReadFrom(void *reader, char *data, int data_len);
|
|
|
|
#endif
|
|
diff --git a/observer_agent/grpc_comm/client.cpp b/observer_agent/grpc_comm/client.cpp
|
|
index 0dd02f9..5cf8cf2 100644
|
|
--- a/observer_agent/grpc_comm/client.cpp
|
|
+++ b/observer_agent/grpc_comm/client.cpp
|
|
@@ -87,10 +87,9 @@ void PubSubClient::Publish(const int topic, const std::string &content)
|
|
}
|
|
}
|
|
|
|
-void PubSubClient::UnSubscribe(const int topic)
|
|
+void PubSubClient::UnSubscribe(void)
|
|
{
|
|
UnSubscribeRequest request;
|
|
- request.set_topic(topic);
|
|
request.set_sub_name(uuid_str);
|
|
|
|
ClientContext unsub_context;
|
|
diff --git a/observer_agent/grpc_comm/client_sub_demo.cpp b/observer_agent/grpc_comm/client_sub_demo.cpp
|
|
index fbf27ad..550b503 100644
|
|
--- a/observer_agent/grpc_comm/client_sub_demo.cpp
|
|
+++ b/observer_agent/grpc_comm/client_sub_demo.cpp
|
|
@@ -34,7 +34,7 @@ int main(int argc, char **argv)
|
|
some_data = client.ReadFrom(cli_reader);
|
|
std::cout << "loop whz: " << some_data << std::endl;
|
|
}
|
|
- client.UnSubscribe(std::stoi(argv[1]));
|
|
+ client.UnSubscribe();
|
|
|
|
return 0;
|
|
}
|
|
diff --git a/observer_agent/grpc_comm/grpc_api.h b/observer_agent/grpc_comm/grpc_api.h
|
|
index 27a9139..c5b43cc 100644
|
|
--- a/observer_agent/grpc_comm/grpc_api.h
|
|
+++ b/observer_agent/grpc_comm/grpc_api.h
|
|
@@ -48,7 +48,7 @@ class PubSubClient
|
|
void init(std::shared_ptr<Channel> channel);
|
|
std::unique_ptr<ClientReader<Message>> Subscribe(const int topic);
|
|
void Publish(const int topic, const std::string &content);
|
|
- void UnSubscribe(const int topic);
|
|
+ void UnSubscribe(void);
|
|
std::string ReadFrom(std::unique_ptr<ClientReader<Message>> &reader);
|
|
|
|
private:
|
|
diff --git a/observer_agent/grpc_comm/protos/comm_api.proto b/observer_agent/grpc_comm/protos/comm_api.proto
|
|
index 6c84865..cf1e445 100644
|
|
--- a/observer_agent/grpc_comm/protos/comm_api.proto
|
|
+++ b/observer_agent/grpc_comm/protos/comm_api.proto
|
|
@@ -13,8 +13,7 @@ message SubscribeRequest {
|
|
}
|
|
|
|
message UnSubscribeRequest {
|
|
- int32 topic = 1;
|
|
- string sub_name = 2;
|
|
+ string sub_name = 1;
|
|
}
|
|
|
|
message PublishRequest {
|
|
diff --git a/observer_agent/grpc_comm/server.cpp b/observer_agent/grpc_comm/server.cpp
|
|
index b47b1aa..938d09c 100644
|
|
--- a/observer_agent/grpc_comm/server.cpp
|
|
+++ b/observer_agent/grpc_comm/server.cpp
|
|
@@ -176,7 +176,6 @@ class PubSubServiceImpl final : public SubManager::Service
|
|
|
|
grpc::Status UnSubscribe(ServerContext *context, const UnSubscribeRequest *request, Message *response) override
|
|
{
|
|
- int cli_topic = request->topic();
|
|
std::string cli_name = request->sub_name();
|
|
int i = 0;
|
|
int unsub_flag = 0;
|
|
@@ -189,27 +188,23 @@ class PubSubServiceImpl final : public SubManager::Service
|
|
|
|
std::lock_guard<std::mutex> lock(sub_mutex);
|
|
|
|
- for (auto topic_item : suber_topic_[cli_name])
|
|
+ std::unordered_map<std::string, std::vector<int>>::iterator iter = suber_topic_.find(cli_name);
|
|
+ if (iter != suber_topic_.end())
|
|
{
|
|
- if (topic_item == cli_topic)
|
|
- {
|
|
- suber_topic_[cli_name].erase(suber_topic_[cli_name].begin() + i);
|
|
- suber_writer_[cli_name].erase(suber_writer_[cli_name].begin() + i);
|
|
- connect_status[suber_connection_[cli_name].at(i)] = false;
|
|
- suber_connection_[cli_name].erase(suber_connection_[cli_name].begin() + i);
|
|
- connection_num--;
|
|
- unsub_flag = 1;
|
|
- break;
|
|
- }
|
|
- i++;
|
|
+ suber_topic_[cli_name].erase(suber_topic_[cli_name].begin() + i);
|
|
+ suber_writer_[cli_name].erase(suber_writer_[cli_name].begin() + i);
|
|
+ connect_status[suber_connection_[cli_name].at(i)] = false;
|
|
+ suber_connection_[cli_name].erase(suber_connection_[cli_name].begin() + i);
|
|
+ connection_num--;
|
|
+ unsub_flag = 1;
|
|
}
|
|
|
|
if (!unsub_flag)
|
|
{
|
|
- response->set_text("don't exist the topic: " + std::to_string(cli_topic));
|
|
- return grpc::Status(grpc::StatusCode::INTERNAL, "Failed to UnSubscribe topic!");
|
|
+ response->set_text("don't exist the reader");
|
|
+ return grpc::Status(grpc::StatusCode::INTERNAL, "Failed to UnSubscribe reader!");
|
|
}
|
|
- response->set_text("topic: " + std::to_string(cli_topic) + " UnSubscribe success!");
|
|
+ response->set_text("UnSubscribe success!");
|
|
return grpc::Status::OK;
|
|
}
|
|
|
|
--
|
|
2.33.0
|
|
|