From 1da825dd02e5701bbe12e6aca8c4c2453b6e8710 Mon Sep 17 00:00:00 2001 From: fly_1997 Date: Tue, 2 Jul 2024 14:01:38 +0800 Subject: [PATCH] reduce thread collector load --- thread_collector/thread_collector.cpp | 128 +++++++++++++++++++------- 1 file changed, 94 insertions(+), 34 deletions(-) diff --git a/thread_collector/thread_collector.cpp b/thread_collector/thread_collector.cpp index d86e039..f1d9cc4 100644 --- a/thread_collector/thread_collector.cpp +++ b/thread_collector/thread_collector.cpp @@ -11,59 +11,116 @@ ******************************************************************************/ #include "interface.h" #include "thread_info.h" -#include #include #include +#include +#include #include +#include -const std::string PATH = "/proc"; char thread_name[] = "thread_collector"; const int CYCLE_SIZE = 500; -const std::string STATUS_NAME = "Name:\t"; -const int STATUS_NAME_LENGTH = 6; +const int MAX_NAME_LENGTH = 20; static DataRingBuf ring_buf; static DataBuf data_buf; -static std::vector threads(THREAD_NUM); +static ThreadInfo threads[THREAD_NUM]; +/* For quickly access to the threads array. key: tid, value: the index of threads[THREAD_NUM]. */ +static std::unordered_map tids; +/* Saves the last modification time of the task dir in process. */ +static std::unordered_map task_time; -static int get_all_threads() { - DIR *proc_dir = opendir(PATH.c_str()); +static void clear_invalid_tid(int &num) { + int cur = -1; + + for (int i = 0; i < num; ++i) { + auto tid = threads[i].tid; + std::string task_path = "/proc/" + std::to_string(threads[i].pid) + "/task/" + std::to_string(threads[i].tid); + /* If current thread does not exist, clear it. */ + if (access(task_path.c_str(), F_OK) < 0) { + tids.erase(tid); + if (task_time.count(tid)) { + task_time.erase(tid); + } + /* Find the first invalid position. */ + if (cur == -1) { + cur = i; + } + continue; + } + /* Update threads by moving threads in the back to the front */ + if (cur != -1) { + threads[cur++] = threads[i]; + } + } + /* Update thread total number. */ + if (cur != -1) { + num = cur; + } +} + +static ThreadInfo get_thread_info(int pid, int tid) { + std::string s_path = "/proc/" + std::to_string(pid) + "/task/" + std::to_string(tid) + "/comm"; + FILE *s_file = fopen(s_path.c_str(), "r"); + if (s_file == nullptr) { + return ThreadInfo{}; + } + char name[MAX_NAME_LENGTH]; + fscanf(s_file, "%s", name); + fclose(s_file); + return ThreadInfo{pid, tid, std::string(name)}; +} + +static bool process_not_change(struct stat *task_stat, const std::string &task_path, int pid) { + return stat(task_path.c_str(), task_stat) != 0 || (task_time.count(pid) && task_time[pid] == task_stat->st_mtime); +} + +static void collect_threads(int pid, DIR *task_dir, int &num) { + struct dirent *task_entry; + while ((task_entry = readdir(task_dir)) != nullptr) { + if (!isdigit(task_entry->d_name[0])) { + continue; + } + int tid = atoi(task_entry->d_name); + /* Update if the thread exists. */ + if (tids.count(tid)) { + threads[tids[tid]] = get_thread_info(pid, tid); + continue; + } + /* If the thread does not exist, add it. */ + if (num < THREAD_NUM) { + tids[tid] = num; + threads[num++] = get_thread_info(pid, tid); + } + } +} + +static int get_all_threads(int &num) { + DIR *proc_dir = opendir("/proc"); if (proc_dir == nullptr) { return 0; } + clear_invalid_tid(num); struct dirent *entry; - int num = 0; while ((entry = readdir(proc_dir)) != nullptr) { if (!isdigit(entry->d_name[0])) { continue; } int pid = atoi(entry->d_name); - DIR *task_dir = opendir(("/proc/" + std::to_string(pid) + "/task").c_str()); + std::string task_path = "/proc/" + std::to_string(pid) + "/task"; + DIR *task_dir = opendir(task_path.c_str()); if (task_dir == nullptr) { continue; } - struct dirent *task_entry; - while ((task_entry = readdir(task_dir)) != nullptr) { - if (!isdigit(task_entry->d_name[0])) { - continue; - } - int tid = atoi(task_entry->d_name); - std::ifstream status_file("/proc/" + std::to_string(pid) + "/task/" + std::to_string(tid) + "/status"); - if (!status_file.is_open()) { - continue; - } - std::string line; - std::string name; - while (getline(status_file, line)) { - if (line.substr(0, STATUS_NAME_LENGTH) == STATUS_NAME) { - name = line.substr(STATUS_NAME_LENGTH); - break; - } - } - if (num < THREAD_NUM) { - threads[num++] = ThreadInfo{pid, tid, name}; - } - status_file.close(); - } + struct stat task_stat; + /* Continue if the process does not change */ + if (process_not_change(&task_stat, task_path, pid)) { + closedir(task_dir); + continue; + } + /* Update last modification time of the process. */ + task_time[pid] = task_stat.st_mtime; + /* Update threads info */ + collect_threads(pid, task_dir, num); closedir(task_dir); } @@ -90,6 +147,8 @@ int get_priority() { return 0; } bool enable() { + tids.clear(); + task_time.clear(); ring_buf.count = 0; ring_buf.index = -1; ring_buf.buf_len = 1; @@ -109,9 +168,10 @@ void run(const Param *param) { (void)param; ring_buf.count++; int index = (ring_buf.index + 1) % ring_buf.buf_len; - int num = get_all_threads(); + int num = data_buf.len; + get_all_threads(num); data_buf.len = num; - data_buf.data = threads.data(); + data_buf.data = (void*)threads; ring_buf.buf[index] = data_buf; ring_buf.index = index; } -- 2.33.0