secDetector/Backport-lib-modify-for-unsub.patch
hurricane618 d224ac9cae backport some fix patchs
fix some bugs

Signed-off-by: hurricane618 <hurricane618@hotmail.com>
2023-12-05 09:43:14 +08:00

121 lines
3.6 KiB
Diff

From d9a4c1cf011ab3d26b88229b5072ebbf6017893a Mon Sep 17 00:00:00 2001
From: zgzxx <zhangguangzhi3@huawei.com>
Date: Fri, 1 Dec 2023 17:36:11 +0800
Subject: [PATCH 4/6] lib modify for unsub
---
examples/python/client.py | 10 ++++++----
lib/secDetector_sdk.cpp | 19 +++++++++++++------
observer_agent/grpc_comm/server.cpp | 2 +-
3 files changed, 20 insertions(+), 11 deletions(-)
diff --git a/examples/python/client.py b/examples/python/client.py
index d6dd7aa..312384d 100644
--- a/examples/python/client.py
+++ b/examples/python/client.py
@@ -36,9 +36,11 @@ secDetectorsdklib.secUnsub.restype = None
secDetectorsdklib.secReadFrom.argtypes = [ctypes.c_void_p, ctypes.c_char_p, ctypes.c_int]
secDetectorsdklib.secReadFrom.restype = None
+g_read_flag = True
def thread_func_sub_and_read(num=0):
global g_cli_reader
+ global g_read_flag
cli_reader = secDetectorsdklib.secSub(1)
g_cli_reader_lock.acquire()
@@ -50,10 +52,7 @@ def thread_func_sub_and_read(num=0):
secDetectorsdklib.secReadFrom(cli_reader, data, data_len)
print("client read data:{}".format(data.value.decode()))
- while True:
- if data.value.decode() == 'end':
- print("client received end")
- break
+ while g_read_flag:
time.sleep(3)
secDetectorsdklib.secReadFrom(cli_reader, data, data_len)
print("client while read data:{}".format(data.value.decode()))
@@ -62,8 +61,11 @@ def thread_func_sub_and_read(num=0):
def thread_func_unsub(num=0):
global g_cli_reader
+ global g_read_flag
+
g_cli_reader_lock.acquire()
try:
+ g_read_flag = False
secDetectorsdklib.secUnsub(1, g_cli_reader)
finally:
g_cli_reader_lock.release()
diff --git a/lib/secDetector_sdk.cpp b/lib/secDetector_sdk.cpp
index ee76079..847dd2f 100644
--- a/lib/secDetector_sdk.cpp
+++ b/lib/secDetector_sdk.cpp
@@ -55,14 +55,14 @@ void secUnsub(const int topic, void *reader)
}
if (!reader)
- return;
+ return;
- g_client.Publish(topic, "end");
g_client.UnSubscribe(topic);
Readmap::iterator iter = g_reader_map.find(reader);
if (iter != g_reader_map.end()) {
g_reader_map.erase(iter);
+ reader = NULL;
}
}
@@ -70,13 +70,20 @@ void secReadFrom(void *reader, char *data, int data_len)
{
string msg("");
- if (!reader || !data || data_len <= 1)
- return;
+ if (!data || data_len <= 1)
+ return
+
+ memset(data, 0, data_len);
+
+ if (!reader)
+ return;
Readmap::iterator iter = g_reader_map.find(reader);
- if (iter != g_reader_map.end()) {
+ if (iter != g_reader_map.end()) {
msg = g_client.ReadFrom(iter->second);
- }
+ if (msg == "keepalive")
+ return;
+ }
strncpy(data, msg.c_str(), data_len - 1);
}
diff --git a/observer_agent/grpc_comm/server.cpp b/observer_agent/grpc_comm/server.cpp
index 54ae66f..3340dfa 100644
--- a/observer_agent/grpc_comm/server.cpp
+++ b/observer_agent/grpc_comm/server.cpp
@@ -104,7 +104,6 @@ class PubSubServiceImpl final : public SubManager::Service
keepalive_msg.set_text("keepalive");
while (connect_status[tmp_index])
{
- sleep(CHECK_TIME);
if (!writer->Write(keepalive_msg))
{
for (auto topic_item : suber_topic_[cli_name])
@@ -124,6 +123,7 @@ class PubSubServiceImpl final : public SubManager::Service
}
return grpc::Status(grpc::StatusCode::INTERNAL, "writer is lose!");
}
+ sleep(CHECK_TIME);
}
return grpc::Status::OK;
}
--
2.33.0