89 lines
3.7 KiB
Diff
89 lines
3.7 KiB
Diff
From be8f48eed633d99aaf9eadd25d7562391d0807b9 Mon Sep 17 00:00:00 2001
|
|
From: algorithmofdish <hexiujun1@huawei.com>
|
|
Date: Wed, 14 Dec 2022 15:30:06 +0800
|
|
Subject: [PATCH] perf(infer): optimize cause location with time delay
|
|
|
|
---
|
|
cause_inference/abnormal_event.py | 8 ++++++++
|
|
cause_inference/cause_infer.py | 11 ++++++++---
|
|
2 files changed, 16 insertions(+), 3 deletions(-)
|
|
|
|
diff --git a/cause_inference/abnormal_event.py b/cause_inference/abnormal_event.py
|
|
index f55c3d0..599d72d 100644
|
|
--- a/cause_inference/abnormal_event.py
|
|
+++ b/cause_inference/abnormal_event.py
|
|
@@ -2,6 +2,7 @@ import json
|
|
from enum import Enum
|
|
from queue import Queue, Empty
|
|
from typing import List
|
|
+import time
|
|
|
|
from kafka import KafkaConsumer
|
|
|
|
@@ -37,6 +38,7 @@ class AbnEvtMgt:
|
|
except Empty as ex:
|
|
raise NoKpiEventException from ex
|
|
|
|
+ self.wait_future_evts(abn_kpi.timestamp)
|
|
self.consume_kpi_evts_with_deadline(abn_kpi.timestamp)
|
|
self.consume_metric_evts_with_deadline(abn_kpi.timestamp)
|
|
self.clear_aging_evts(abn_kpi.timestamp)
|
|
@@ -145,6 +147,12 @@ class AbnEvtMgt:
|
|
def is_future(self, evt_ts, cur_ts):
|
|
return evt_ts > cur_ts + self.future_duration
|
|
|
|
+ def wait_future_evts(self, evt_ts):
|
|
+ cur_ts = int(time.time()) * 1000
|
|
+ if evt_ts <= cur_ts < evt_ts + self.future_duration:
|
|
+ wait_sec = (evt_ts + self.future_duration - cur_ts) // 1000
|
|
+ time.sleep(wait_sec)
|
|
+
|
|
|
|
def preprocess_abn_score(score):
|
|
return max(0, score)
|
|
diff --git a/cause_inference/cause_infer.py b/cause_inference/cause_infer.py
|
|
index 82a83e1..b22768f 100644
|
|
--- a/cause_inference/cause_infer.py
|
|
+++ b/cause_inference/cause_infer.py
|
|
@@ -58,10 +58,14 @@ class CauseLocator:
|
|
@staticmethod
|
|
def filter_causes(causes: List[Cause]) -> List[Cause]:
|
|
res = []
|
|
+ dup = set()
|
|
for cause in causes:
|
|
filtered_cause = CauseLocator.clear_virtual_cause(cause)
|
|
if filtered_cause is not None:
|
|
- res.append(filtered_cause)
|
|
+ key = (filtered_cause.metric_id, filtered_cause.entity_id)
|
|
+ if key not in dup:
|
|
+ dup.add(key)
|
|
+ res.append(filtered_cause)
|
|
return res
|
|
|
|
def construct_causal_graph(self, entity_causal_relations: List[tuple], abn_metrics: List[AbnormalEvent],
|
|
@@ -106,10 +110,11 @@ class CauseLocator:
|
|
self.topo_ts = self.topo_db_mgt.query_recent_topo_ts(self.abn_kpi.timestamp // 1000)
|
|
|
|
def calc_corr_score(self, causal_graph: CausalGraph):
|
|
+ end_ts = self.abn_kpi.timestamp // 1000 + infer_config.infer_conf.get('evt_future_duration')
|
|
if not self.abn_kpi.hist_data:
|
|
hist_data = self.metric_db_mgt.query_metric_hist_data(self.abn_kpi.abnormal_metric_id,
|
|
self.abn_kpi.metric_labels,
|
|
- self.topo_ts)
|
|
+ end_ts)
|
|
self.abn_kpi.set_hist_data(hist_data)
|
|
|
|
for node_id, node_attrs in causal_graph.entity_cause_graph.nodes.items():
|
|
@@ -120,7 +125,7 @@ class CauseLocator:
|
|
|
|
abn_metrics = causal_graph.get_abnormal_metrics(node_id)
|
|
for metric_id, metric_attrs in abn_metrics.items():
|
|
- metric_hist_data = self.metric_db_mgt.query_metric_hist_data(metric_id, metric_labels, self.topo_ts)
|
|
+ metric_hist_data = self.metric_db_mgt.query_metric_hist_data(metric_id, metric_labels, end_ts)
|
|
|
|
data_trend = trend(metric_hist_data)
|
|
metric_attrs.setdefault('real_trend', data_trend)
|
|
--
|
|
2.21.0.windows.1
|
|
|