From be8f48eed633d99aaf9eadd25d7562391d0807b9 Mon Sep 17 00:00:00 2001 From: algorithmofdish Date: Wed, 14 Dec 2022 15:30:06 +0800 Subject: [PATCH] perf(infer): optimize cause location with time delay --- cause_inference/abnormal_event.py | 8 ++++++++ cause_inference/cause_infer.py | 11 ++++++++--- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/cause_inference/abnormal_event.py b/cause_inference/abnormal_event.py index f55c3d0..599d72d 100644 --- a/cause_inference/abnormal_event.py +++ b/cause_inference/abnormal_event.py @@ -2,6 +2,7 @@ import json from enum import Enum from queue import Queue, Empty from typing import List +import time from kafka import KafkaConsumer @@ -37,6 +38,7 @@ class AbnEvtMgt: except Empty as ex: raise NoKpiEventException from ex + self.wait_future_evts(abn_kpi.timestamp) self.consume_kpi_evts_with_deadline(abn_kpi.timestamp) self.consume_metric_evts_with_deadline(abn_kpi.timestamp) self.clear_aging_evts(abn_kpi.timestamp) @@ -145,6 +147,12 @@ class AbnEvtMgt: def is_future(self, evt_ts, cur_ts): return evt_ts > cur_ts + self.future_duration + def wait_future_evts(self, evt_ts): + cur_ts = int(time.time()) * 1000 + if evt_ts <= cur_ts < evt_ts + self.future_duration: + wait_sec = (evt_ts + self.future_duration - cur_ts) // 1000 + time.sleep(wait_sec) + def preprocess_abn_score(score): return max(0, score) diff --git a/cause_inference/cause_infer.py b/cause_inference/cause_infer.py index 82a83e1..b22768f 100644 --- a/cause_inference/cause_infer.py +++ b/cause_inference/cause_infer.py @@ -58,10 +58,14 @@ class CauseLocator: @staticmethod def filter_causes(causes: List[Cause]) -> List[Cause]: res = [] + dup = set() for cause in causes: filtered_cause = CauseLocator.clear_virtual_cause(cause) if filtered_cause is not None: - res.append(filtered_cause) + key = (filtered_cause.metric_id, filtered_cause.entity_id) + if key not in dup: + dup.add(key) + res.append(filtered_cause) return res def construct_causal_graph(self, entity_causal_relations: List[tuple], abn_metrics: List[AbnormalEvent], @@ -106,10 +110,11 @@ class CauseLocator: self.topo_ts = self.topo_db_mgt.query_recent_topo_ts(self.abn_kpi.timestamp // 1000) def calc_corr_score(self, causal_graph: CausalGraph): + end_ts = self.abn_kpi.timestamp // 1000 + infer_config.infer_conf.get('evt_future_duration') if not self.abn_kpi.hist_data: hist_data = self.metric_db_mgt.query_metric_hist_data(self.abn_kpi.abnormal_metric_id, self.abn_kpi.metric_labels, - self.topo_ts) + end_ts) self.abn_kpi.set_hist_data(hist_data) for node_id, node_attrs in causal_graph.entity_cause_graph.nodes.items(): @@ -120,7 +125,7 @@ class CauseLocator: abn_metrics = causal_graph.get_abnormal_metrics(node_id) for metric_id, metric_attrs in abn_metrics.items(): - metric_hist_data = self.metric_db_mgt.query_metric_hist_data(metric_id, metric_labels, self.topo_ts) + metric_hist_data = self.metric_db_mgt.query_metric_hist_data(metric_id, metric_labels, end_ts) data_trend = trend(metric_hist_data) metric_attrs.setdefault('real_trend', data_trend) -- 2.21.0.windows.1