121 lines
3.6 KiB
Diff
121 lines
3.6 KiB
Diff
From d9a4c1cf011ab3d26b88229b5072ebbf6017893a Mon Sep 17 00:00:00 2001
|
|
From: zgzxx <zhangguangzhi3@huawei.com>
|
|
Date: Fri, 1 Dec 2023 17:36:11 +0800
|
|
Subject: [PATCH 4/6] lib modify for unsub
|
|
|
|
---
|
|
examples/python/client.py | 10 ++++++----
|
|
lib/secDetector_sdk.cpp | 19 +++++++++++++------
|
|
observer_agent/grpc_comm/server.cpp | 2 +-
|
|
3 files changed, 20 insertions(+), 11 deletions(-)
|
|
|
|
diff --git a/examples/python/client.py b/examples/python/client.py
|
|
index d6dd7aa..312384d 100644
|
|
--- a/examples/python/client.py
|
|
+++ b/examples/python/client.py
|
|
@@ -36,9 +36,11 @@ secDetectorsdklib.secUnsub.restype = None
|
|
secDetectorsdklib.secReadFrom.argtypes = [ctypes.c_void_p, ctypes.c_char_p, ctypes.c_int]
|
|
secDetectorsdklib.secReadFrom.restype = None
|
|
|
|
+g_read_flag = True
|
|
|
|
def thread_func_sub_and_read(num=0):
|
|
global g_cli_reader
|
|
+ global g_read_flag
|
|
|
|
cli_reader = secDetectorsdklib.secSub(1)
|
|
g_cli_reader_lock.acquire()
|
|
@@ -50,10 +52,7 @@ def thread_func_sub_and_read(num=0):
|
|
secDetectorsdklib.secReadFrom(cli_reader, data, data_len)
|
|
print("client read data:{}".format(data.value.decode()))
|
|
|
|
- while True:
|
|
- if data.value.decode() == 'end':
|
|
- print("client received end")
|
|
- break
|
|
+ while g_read_flag:
|
|
time.sleep(3)
|
|
secDetectorsdklib.secReadFrom(cli_reader, data, data_len)
|
|
print("client while read data:{}".format(data.value.decode()))
|
|
@@ -62,8 +61,11 @@ def thread_func_sub_and_read(num=0):
|
|
|
|
def thread_func_unsub(num=0):
|
|
global g_cli_reader
|
|
+ global g_read_flag
|
|
+
|
|
g_cli_reader_lock.acquire()
|
|
try:
|
|
+ g_read_flag = False
|
|
secDetectorsdklib.secUnsub(1, g_cli_reader)
|
|
finally:
|
|
g_cli_reader_lock.release()
|
|
diff --git a/lib/secDetector_sdk.cpp b/lib/secDetector_sdk.cpp
|
|
index ee76079..847dd2f 100644
|
|
--- a/lib/secDetector_sdk.cpp
|
|
+++ b/lib/secDetector_sdk.cpp
|
|
@@ -55,14 +55,14 @@ void secUnsub(const int topic, void *reader)
|
|
}
|
|
|
|
if (!reader)
|
|
- return;
|
|
+ return;
|
|
|
|
- g_client.Publish(topic, "end");
|
|
g_client.UnSubscribe(topic);
|
|
|
|
Readmap::iterator iter = g_reader_map.find(reader);
|
|
if (iter != g_reader_map.end()) {
|
|
g_reader_map.erase(iter);
|
|
+ reader = NULL;
|
|
}
|
|
}
|
|
|
|
@@ -70,13 +70,20 @@ void secReadFrom(void *reader, char *data, int data_len)
|
|
{
|
|
string msg("");
|
|
|
|
- if (!reader || !data || data_len <= 1)
|
|
- return;
|
|
+ if (!data || data_len <= 1)
|
|
+ return
|
|
+
|
|
+ memset(data, 0, data_len);
|
|
+
|
|
+ if (!reader)
|
|
+ return;
|
|
|
|
Readmap::iterator iter = g_reader_map.find(reader);
|
|
- if (iter != g_reader_map.end()) {
|
|
+ if (iter != g_reader_map.end()) {
|
|
msg = g_client.ReadFrom(iter->second);
|
|
- }
|
|
+ if (msg == "keepalive")
|
|
+ return;
|
|
+ }
|
|
|
|
strncpy(data, msg.c_str(), data_len - 1);
|
|
}
|
|
diff --git a/observer_agent/grpc_comm/server.cpp b/observer_agent/grpc_comm/server.cpp
|
|
index 54ae66f..3340dfa 100644
|
|
--- a/observer_agent/grpc_comm/server.cpp
|
|
+++ b/observer_agent/grpc_comm/server.cpp
|
|
@@ -104,7 +104,6 @@ class PubSubServiceImpl final : public SubManager::Service
|
|
keepalive_msg.set_text("keepalive");
|
|
while (connect_status[tmp_index])
|
|
{
|
|
- sleep(CHECK_TIME);
|
|
if (!writer->Write(keepalive_msg))
|
|
{
|
|
for (auto topic_item : suber_topic_[cli_name])
|
|
@@ -124,6 +123,7 @@ class PubSubServiceImpl final : public SubManager::Service
|
|
}
|
|
return grpc::Status(grpc::StatusCode::INTERNAL, "writer is lose!");
|
|
}
|
|
+ sleep(CHECK_TIME);
|
|
}
|
|
return grpc::Status::OK;
|
|
}
|
|
--
|
|
2.33.0
|
|
|