distributed-beget/0004-refactor-using-the-reactor-framework.patch
2024-03-28 14:50:31 +08:00

2144 lines
70 KiB
Diff
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

diff --git b/services/include/list.h b/services/include/list.h
new file mode 100644
index 0000000..f45bdf5
--- /dev/null
+++ b/services/include/list.h
@@ -0,0 +1,49 @@
+/*
+ * Copyright (c) 2021 Huawei Device Co., Ltd.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef BASE_STARTUP_INITLITE_LIST_H
+#define BASE_STARTUP_INITLITE_LIST_H
+#include <stddef.h>
+
+#ifdef __cplusplus
+#if __cplusplus
+extern "C" {
+#endif
+#endif
+
+typedef struct ListNode {
+ struct ListNode *next;
+ struct ListNode *prev;
+} ListNode, ListHead;
+
+#define ListEmpty(node) \
+ do { \
+ node.next = &node; \
+ node.prev = &node; \
+ } while (0) \
+
+#define ListEntry(ptr, type, member) ((type *)((char *)(ptr) - offsetof(type, member)))
+
+
+void OH_ListAddTail(struct ListNode *head, struct ListNode *item);
+void OH_ListRemove(struct ListNode *item);
+
+#ifdef __cplusplus
+#if __cplusplus
+}
+#endif
+#endif
+
+#endif // BASE_STARTUP_INITLITE_LIST_H
diff --git a/services/param/base/BUILD.gn b/services/param/base/BUILD.gn
index 178ac87..b253055 100644
--- a/services/param/base/BUILD.gn
+++ b/services/param/base/BUILD.gn
@@ -11,7 +11,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import("//build/ohos.gni")
-import("//build/config/sysroot.gni")
config("exported_header_files") {
visibility = [ ":*" ]
@@ -19,7 +18,7 @@ config("exported_header_files") {
"//base/startup/init/interfaces/innerkits/include",
"//base/startup/init/services/include/param",
"//base/startup/init/services/include",
- "${sysroot}/usr/include/hilog",
+ "//base/hiviewdfx/hilog/interfaces/native/innerkits/include",
]
}
@@ -30,7 +29,7 @@ comm_sources = [
base_include_dirs = [
"//base/startup/init/services/param/include",
"//base/startup/init/services/param/base",
- "${sysroot}/usr/include/hilog",
+ "//base/hiviewdfx/hilog/interfaces/native/innerkits/include",
]
source_set("parameterbase") {
diff --git a/services/param/linux/param_request.c b/services/param/linux/param_request.c
index afd95fe..76947f2 100644
--- a/services/param/linux/param_request.c
+++ b/services/param/linux/param_request.c
@@ -29,9 +29,8 @@
#include <stdio.h>
#include "beget_ext.h"
-#include "param_manager.h"
-static void ClearEnv(ParamRequestMsg* pmsg, ParamRespMsg* respmsg, int fd)
+static void ClearEnv(ParamReqMsg* pmsg, ParamRespMsg* respmsg, int fd)
{
if (pmsg != NULL)
free(pmsg);
@@ -49,9 +48,8 @@ static int GetClientSocket()
struct sockaddr_un serverAddr;
bzero(&serverAddr, sizeof(serverAddr));
serverAddr.sun_family = PF_UNIX;
- strncpy(serverAddr.sun_path, PIPE_NAME, strlen(PIPE_NAME) + 1);
+ strncpy(serverAddr.sun_path, PIPE_NAME, strlen(PIPE_NAME));
if (connect(cfd, (struct sockaddr*)&serverAddr, sizeof(serverAddr)) < 0) {
- close(cfd);
perror("Failed to connect");
return -1;
}
@@ -59,33 +57,33 @@ static int GetClientSocket()
return cfd;
}
-static struct ParamRequestMsg* GetRequestMsg(uint32_t type, uint32_t size)
+static struct ParamReqMsg* GetRequestMsg(uint32_t type, uint32_t size)
{
uint32_t data_alloc_size = size;
if (data_alloc_size > PARAM_VALUE_LEN_MAX || data_alloc_size == 0)
data_alloc_size = PARAM_VALUE_LEN_MAX;
- struct ParamRequestMsg *pmsg;
+ struct ParamReqMsg *pmsg;
if (type == GET_PARAMETER) {
- pmsg = (struct ParamRequestMsg*)malloc(sizeof(struct ParamRequestMsg));
- BEGET_ERROR_CHECK(pmsg != NULL, return NULL, "Failed to malloc ParamRequestMsg");
- bzero(pmsg, sizeof(struct ParamRequestMsg));
+ pmsg = (struct ParamReqMsg*)malloc(sizeof(struct ParamReqMsg));
+ BEGET_ERROR_CHECK(pmsg != NULL, return NULL, "Failed to malloc ParamReqMsg");
+ bzero(pmsg, sizeof(struct ParamReqMsg));
} else if (type == SET_PARAMETER) {
- pmsg = (struct ParamRequestMsg*)malloc(sizeof(struct ParamRequestMsg) + data_alloc_size);
- BEGET_ERROR_CHECK(pmsg != NULL, return NULL, "Failed to malloc ParamRequestMsg");
- bzero(pmsg, sizeof(struct ParamRequestMsg) + data_alloc_size);
+ pmsg = (struct ParamReqMsg*)malloc(sizeof(struct ParamReqMsg) + data_alloc_size);
+ BEGET_ERROR_CHECK(pmsg != NULL, return NULL, "Failed to malloc ParamReqMsg");
+ bzero(pmsg, sizeof(struct ParamReqMsg) + data_alloc_size);
} else if (type == WAIT_PARAMETER) {
- pmsg = (struct ParamRequestMsg*)malloc(sizeof(struct ParamRequestMsg) + data_alloc_size);
- BEGET_ERROR_CHECK(pmsg != NULL, return NULL, "Failed to malloc ParamRequestMsg");
- bzero(pmsg, sizeof(struct ParamRequestMsg) + data_alloc_size);
+ pmsg = (struct ParamReqMsg*)malloc(sizeof(struct ParamReqMsg) + data_alloc_size);
+ BEGET_ERROR_CHECK(pmsg != NULL, return NULL, "Failed to malloc ParamReqMsg");
+ bzero(pmsg, sizeof(struct ParamReqMsg) + data_alloc_size);
}
pmsg->datasize = data_alloc_size;
pmsg->type = type;
return pmsg;
}
-static struct ParamRespMsg* StartRequest(int fd, struct ParamRequestMsg* pmsg)
+static struct ParamRespMsg* StartRequest(int fd, struct ParamReqMsg* pmsg)
{
- int ret = send(fd, pmsg, sizeof(struct ParamRequestMsg) + pmsg->datasize, 0);
+ int ret = send(fd, pmsg, sizeof(struct ParamReqMsg) + pmsg->datasize, 0);
BEGET_ERROR_CHECK(ret > 0, return NULL, "Failed to send msg");
struct ParamRespMsg* respmsg = (struct ParamRespMsg*)malloc(sizeof(struct ParamRespMsg) + PARAM_VALUE_LEN_MAX);
@@ -106,13 +104,13 @@ int SystemSetParameter(const char *name, const char *value)
int fd = GetClientSocket();
if (fd < 0)
return -1;
- struct ParamRequestMsg* pmsg = GetRequestMsg(SET_PARAMETER, strlen(value));
+ struct ParamReqMsg* pmsg = GetRequestMsg(SET_PARAMETER, strlen(value));
if (pmsg == NULL) {
close(fd);
return -1;
}
- strncpy(pmsg->key, name, sizeof(pmsg->key) - 1);
+ strncpy(pmsg->key, name, sizeof(pmsg->key));
strncpy(pmsg->data, value, pmsg->datasize);
int ret;
struct ParamRespMsg* respmsg = StartRequest(fd, pmsg);
@@ -138,10 +136,10 @@ int SystemReadParam(const char *name, char *value, uint32_t *len)
int fd = GetClientSocket();
if (fd < 0)
return -1;
- struct ParamRequestMsg* pmsg = GetRequestMsg(GET_PARAMETER, *len);
+ struct ParamReqMsg* pmsg = GetRequestMsg(GET_PARAMETER, *len);
BEGET_ERROR_CHECK(pmsg != NULL, close(fd);return -1, "Invalid pmsg");
- strncpy(pmsg->key, name, sizeof(pmsg->key) - 1);
+ strncpy(pmsg->key, name, sizeof(pmsg->key));
int ret;
struct ParamRespMsg* respmsg = StartRequest(fd, pmsg);
if (respmsg == NULL) {
@@ -175,11 +173,14 @@ int SystemWaitParameter(const char *name, const char *value, int32_t timeout)
if (fd < 0)
return -1;
- struct ParamRequestMsg* pmsg = GetRequestMsg(WAIT_PARAMETER, strlen(value) + 1);
+ struct ParamReqMsg* pmsg = GetRequestMsg(WAIT_PARAMETER, strlen(value) + 1);
BEGET_ERROR_CHECK(pmsg != NULL, close(fd);return -1, "Invalid pmsg");
+ if (timeout < 0) {
+ timeout = 30;
+ }
pmsg->timeout = timeout;
- strncpy(pmsg->key, name, sizeof(pmsg->key) - 1);
+ strncpy(pmsg->key, name, sizeof(pmsg->key));
strncpy(pmsg->data, value, sizeof(pmsg->datasize));
struct ParamRespMsg* respmsg = StartRequest(fd, pmsg);
if (respmsg == NULL) {
diff --git a/services/param/linux/param_request.h b/services/param/linux/param_request.h
index dd95f1b..8264ed4 100644
--- a/services/param/linux/param_request.h
+++ b/services/param/linux/param_request.h
@@ -3,13 +3,13 @@
#include "parameter.h"
-typedef struct ParamRequestMsg {
+typedef struct ParamReqMsg {
uint32_t type;
uint32_t datasize;
uint32_t timeout;
char key[PARAM_NAME_LEN_MAX];
char data[0];
-} ParamRequestMsg;
+} ParamReqMsg;
typedef struct ParamRespMsg {
uint32_t flag;
diff --git a/services/param_service/BUILD.gn b/services/param_service/BUILD.gn
index 84f429f..cfcabce 100644
--- a/services/param_service/BUILD.gn
+++ b/services/param_service/BUILD.gn
@@ -21,6 +21,8 @@ ohos_executable("param_service") {
"src/param_server.c",
"src/le_utils.c",
"src/trie_comm.c",
+ "src/hash.c",
+ "src/base_task.c"
]
include_dirs = [
@@ -28,10 +30,14 @@ ohos_executable("param_service") {
"//base/startup/init/interfaces/innerkits/include/syspara",
"//base/startup/init/interfaces/innerkits/include",
"//base/startup/init/services/param/include",
- "//base/startup/init/services/param/linux/",
+ "//base/startup/init/services/param/linux",
+ "//base/startup/init/services/include",
]
deps = [ "//base/startup/init/services/utils:libinit_utils" ]
+
+ cflags = [ "-Wno-incompatible-pointer-types" ]
+
external_deps = [
"c_utils:utils",
]
diff --git b/services/param_service/include/base_task.h b/services/param_service/include/base_task.h
new file mode 100644
index 0000000..372c33e
--- /dev/null
+++ b/services/param_service/include/base_task.h
@@ -0,0 +1,79 @@
+#ifndef BSAE_TASK_H
+#define BSAE_TASK_H
+#include <stdint.h>
+
+#include "list.h"
+#include "hash.h"
+#include "base_task.h"
+#include "param_request.h"
+
+#define DEFAULT_MAX_EVENTS 1024
+
+typedef void* LoopHandle;
+
+typedef enum : uint32_t {
+ Event_Read,
+ Event_Write,
+} EventType;
+
+typedef enum : uint32_t {
+ NORMAL_TYPE,
+ WAIT_TYPE,
+} ClientType;
+
+typedef struct EventBuffer_ {
+ uint32_t datasize;
+ uint8_t data[0];
+} EventBuffer;
+
+typedef struct Content_ {
+ HashNode hashNode;
+ ParamRespMsg *respmsg;
+} Content;
+
+typedef struct BaseTask_ {
+ int taskId;
+ HashNode hashNode;
+ void (*close)(LoopHandle, struct BaseTask_*);
+ void (*handleEvent)(LoopHandle, struct BaseTask_*, EventType);
+} BaseTask;
+
+typedef struct WaitInfo_ {
+ ListNode anchor;
+ int32_t timeout;
+ int32_t taskId;
+ char condition[0];
+} WaitInfo;
+
+typedef struct ClientTask_ {
+ BaseTask base;
+ uint32_t type;
+ void (*recvMessage)(LoopHandle, BaseTask*);
+ void (*sendMessage)(LoopHandle, BaseTask*);
+ void (*disconnect)(LoopHandle, BaseTask*);
+ union {
+ EventBuffer *content;
+ void *extraInfo;
+ } info;
+} ClientTask;
+
+typedef struct ServerTask_ {
+ BaseTask base;
+ void (*incommingConnect)(LoopHandle, BaseTask*);
+} ServerTask;
+
+typedef struct EventLoop_ {
+ int epollFd;
+ int maxevents;
+ void (*Run)(struct EventLoop_*);
+ void (*AddEvent)(struct EventLoop_*, BaseTask*, EventType);
+ void (*ModEvent)(struct EventLoop_*, BaseTask*, EventType);
+ void (*DelEvent)(struct EventLoop_*, BaseTask*);
+ HashTab *tab;
+} EventLoop;
+
+EventLoop* GetDefaultLoop();
+BaseTask* CreateBaseTask(EventLoop *loop, uint32_t size);
+void RunLoop(EventLoop *loop);
+
+#endif // BSAE_TASK_H
\ No newline at end of file
diff --git b/services/param_service/include/hash.h b/services/param_service/include/hash.h
new file mode 100644
index 0000000..d898e30
--- /dev/null
+++ b/services/param_service/include/hash.h
@@ -0,0 +1,37 @@
+#ifndef HASH_H
+#define HASH_H
+
+#include <stddef.h>
+
+#define HASHNODE_ENTRY(ptr, type, member) ((type*)((char*)(ptr) - offsetof(type, member)))
+
+typedef struct HashNode {
+ struct HashNode *next;
+} HashNode;
+
+typedef struct HashTab {
+ int (*nodeHash)(HashNode*);
+ int (*keyHash)(const void *key);
+ int (*nodeCompare)(HashNode*, HashNode*);
+ int (*keyCompare)(HashNode *node, const void *key);
+ void (*nodeFree)(HashNode*);
+ int maxBucket;
+ HashNode *buckets[0];
+} HashTab;
+
+typedef struct {
+ int (*nodeHash)(HashNode*);
+ int (*keyHash)(const void *key);
+ int (*nodeCompare)(HashNode*, HashNode*);
+ int (*keyCompare)(HashNode *node, const void *key);
+ void (*nodeFree)(HashNode*);
+ int maxBucket;
+} HashInfo;
+
+int HashTabCreate(HashTab **tab, HashInfo *info);
+int HashNodeAdd(HashTab *tab, HashNode *node);
+void HashNodeRemove(HashTab *tab, HashNode *node); // only remove, don't free
+int HashTabDestroy(HashTab *tab);
+HashNode* GetHashNode(HashTab *tab, const void* key);
+
+#endif // HASH_H
\ No newline at end of file
diff --git a/services/param_service/include/param_server.h b/services/param_service/include/param_server.h
index 7bca45f..91668a9 100644
--- a/services/param_service/include/param_server.h
+++ b/services/param_service/include/param_server.h
@@ -1,22 +1,13 @@
-#ifndef LE_SOCKET_H
-#define LE_SOCKET_H
-#include <stdint.h>
+#ifndef PARAM_SERVER_H
+#define PARAM_SERVER_H
+#include <pthread.h>
+
#include "param_utils.h"
#include "parameter.h"
+#include "list.h"
+#include "base_task.h"
-#define LOOP_MAX_CLIENT 1024
-#define LOOP_MAX_SOCKET 1024
-
-struct EventArgs {
- int epollFd;
- int clientFd;
-};
-
-enum {
- SOCK_UNKNOWN = -1,
- SOCK_DISCONNECTED,
- SOCK_CONNECTED,
-};
+#define MAX_CLIENT 1024
-void ParamServerStart();
-#endif // LE_SOCKET_H
+int ParamServerInit(EventLoop*);
+#endif // PARAM_SERVER_H
diff --git a/services/param_service/include/trie_comm.h b/services/param_service/include/trie_comm.h
index dfd08ec..df1181a 100644
--- a/services/param_service/include/trie_comm.h
+++ b/services/param_service/include/trie_comm.h
@@ -7,10 +7,10 @@
#define WORKSPACE_NAME WORKSPACE_DIR "/param.tmp"
#define WORKSPACE_SIZE (1024*1000)
-typedef struct ListNode {
+typedef struct TrieListNode {
uint32_t prev;
uint32_t next;
-} ListNode;
+} TrieListNode;
typedef struct ParamNode {
uint8_t keyLen;
@@ -19,7 +19,7 @@ typedef struct ParamNode {
} ParamNode;
typedef struct TrieNode {
- ListNode node;
+ TrieListNode node;
uint32_t child;
uint32_t left;
uint32_t right;
@@ -39,5 +39,4 @@ int ParamWorkSpaceInit();
int SetParamtoMem(const char* key, const char* value);
int GetParamFromMem(const char* key, char* value, uint32_t len);
int WaitParam(const char* key, const char* value, uint32_t timeout);
-void DumpParam();
#endif // TRIE_UTILS_H
\ No newline at end of file
diff --git b/services/param_service/include/trie_queue.h b/services/param_service/include/trie_queue.h
new file mode 100644
index 0000000..6c96f96
--- /dev/null
+++ b/services/param_service/include/trie_queue.h
@@ -0,0 +1,84 @@
+#ifndef TRIE_QUEUE_H
+#define TRIE_QUEUE_H
+#include <stdlib.h>
+
+#include "trie_comm.h"
+#include <stdio.h>
+
+typedef struct QueueItem {
+ struct QueueItem* prev;
+ struct QueueItem* next;
+ TrieNode* node;
+} QueueItem;
+
+typedef struct TrieNodeQueue {
+ int size;
+ int ready;
+ QueueItem queue;
+ void (*push)(struct TrieNodeQueue*, TrieNode*);
+ TrieNode* (*pop)(struct TrieNodeQueue*);
+} TrieNodeQueue;
+
+void TrieNodePush(TrieNodeQueue* tq, TrieNode* node);
+TrieNode* TrieNodePop(TrieNodeQueue* tq);
+
+inline void TrieQueueFirstStageInit(TrieNodeQueue* tq)
+{
+ if (tq == NULL) {
+ return;
+ }
+ tq->size = 0;
+ tq->ready = 0;
+ tq->queue.next = &(tq->queue);
+ tq->queue.prev = &(tq->queue);
+ tq->push = TrieNodePush;
+ tq->pop = TrieNodePop;
+}
+
+inline void TrieQueueSecondStageInit(TrieNodeQueue* tq)
+{
+ if (tq == NULL) {
+ return;
+ }
+ tq->ready = 1;
+}
+
+inline void TrieNodePush(TrieNodeQueue* tq, TrieNode* node)
+{
+ if (tq == NULL || node == NULL) {
+ return;
+ }
+ if (!tq->ready) {
+ return;
+ }
+ QueueItem* item = (QueueItem*)malloc(sizeof(QueueItem));
+ if (!item) {
+ return;
+ }
+
+ QueueItem* queue = &tq->queue;
+ item->node = node;
+ item->next = queue;
+ item->prev = queue->prev;
+ queue->prev->next = item;
+ queue->prev = item;
+ tq->size++;
+}
+
+inline TrieNode* TrieNodePop(TrieNodeQueue* tq)
+{
+ if (tq == NULL || tq->size == 0) {
+ return NULL;
+ }
+
+ QueueItem* queue = &tq->queue;
+ QueueItem* item = queue->prev;
+ queue->prev = queue->prev->prev;
+ queue->prev->next = queue;
+ tq->size--;
+ TrieNode* node = item->node;
+ free(item);
+ return node;
+}
+
+#endif // TRIE_QUEUE_H
\ No newline at end of file
diff --git b/services/param_service/src/base_task.c b/services/param_service/src/base_task.c
new file mode 100644
index 0000000..e97d7dc
--- /dev/null
+++ b/services/param_service/src/base_task.c
@@ -0,0 +1,118 @@
+#include <errno.h>
+#include <stdlib.h>
+#include <sys/epoll.h>
+
+#include "beget_ext.h"
+#include "base_task.h"
+
+static EventLoop *staticloop = NULL;
+
+void AddEvent_(EventLoop *loop, BaseTask *task, EventType event)
+{
+ struct epoll_event ev = {};
+ ev.data.fd = task->taskId;
+ if (event == Event_Read) {
+ ev.events = EPOLLIN;
+ } else if (event == Event_Write) {
+ ev.events = EPOLLOUT;
+ }
+
+ (void)epoll_ctl(loop->epollFd, EPOLL_CTL_ADD, task->taskId, &ev);
+}
+
+void ModEvent_(EventLoop *loop, BaseTask *task, EventType event)
+{
+ struct epoll_event ev = {};
+ ev.data.fd = task->taskId;
+ if (event == Event_Read) {
+ ev.events = EPOLLIN;
+ } else if (event == Event_Write) {
+ ev.events = EPOLLOUT;
+ }
+
+ (void)epoll_ctl(loop->epollFd, EPOLL_CTL_MOD, task->taskId, &ev);
+}
+
+void DelEvent_(EventLoop *loop, BaseTask *task)
+{
+ (void)epoll_ctl(loop->epollFd, EPOLL_CTL_DEL, task->taskId, NULL);
+}
+
+void ProcessEvent(EventLoop *loop, int fd, EventType type)
+{
+ BEGET_ERROR_CHECK(loop != NULL && loop->tab != NULL, return, "%s, invalid param", __func__);
+ HashNode *node = GetHashNode(loop->tab, &fd);
+ if (node == NULL) {
+ return;
+ }
+ BaseTask *task = HASHNODE_ENTRY(node, BaseTask, hashNode);
+ task->handleEvent(loop, task, type);
+}
+
+void Run_(EventLoop *loop)
+{
+ BEGET_ERROR_CHECK(loop != NULL, return, "invalid param");
+ struct epoll_event *events = (struct epoll_event*)malloc(sizeof(struct epoll_event) * loop->maxevents);
+ BEGET_ERROR_CHECK(events!= NULL, return, "fail to allocate space to epoll event");
+
+ while (1) {
+ int num = epoll_wait(loop->epollFd, events, loop->maxevents, -1);
+ for (int i = 0; i < num; ++i) {
+ if (events[i].events & EPOLLIN) {
+ ProcessEvent(loop, events[i].data.fd, Event_Read);
+ } else if (events[i].events & EPOLLOUT) {
+ ProcessEvent(loop, events[i].data.fd, Event_Write);
+ }
+ }
+ }
+}
+
+static int CreateLoop(EventLoop **loop)
+{
+ if (loop == NULL) {
+ return -1;
+ }
+ EventLoop *handle= (EventLoop*)calloc(1, sizeof(EventLoop));
+ BEGET_ERROR_CHECK(handle != NULL, return -1, "fail to allocate space for EventLoop");
+ handle->epollFd = epoll_create(DEFAULT_MAX_EVENTS);
+ BEGET_ERROR_CHECK(handle->epollFd > 0, free(handle);return -1, "failed to create epoll. errno [%d]", errno);
+ handle->Run = Run_;
+ handle->AddEvent = AddEvent_;
+ handle->ModEvent = ModEvent_;
+ handle->DelEvent = DelEvent_;
+ handle->maxevents = DEFAULT_MAX_EVENTS;
+ handle->tab == NULL;
+ *loop = handle;
+ return 0;
+}
+
+
+BaseTask* CreateBaseTask(EventLoop *loop, uint32_t size)
+{
+ BaseTask *task = (BaseTask*)calloc(1, size);
+ BEGET_ERROR_CHECK(task != NULL, return NULL, "fail to create base task");
+ task->hashNode.next = NULL;
+ return task;
+}
+
+EventLoop* GetDefaultLoop()
+{
+ if (staticloop != NULL) {
+ return staticloop;
+ }
+ int ret = CreateLoop(&staticloop);
+ BEGET_ERROR_CHECK(ret == 0, return NULL, "fail to create default loop");
+ return staticloop;
+}
+
+void RunLoop(EventLoop *loop)
+{
+ if (loop != NULL && loop->Run != NULL) {
+ loop->Run(loop);
+ }
+}
+
+void StopLoop()
+{
+
+}
\ No newline at end of file
diff --git b/services/param_service/src/hash.c b/services/param_service/src/hash.c
new file mode 100644
index 0000000..ad3021c
--- /dev/null
+++ b/services/param_service/src/hash.c
@@ -0,0 +1,106 @@
+#include "hash.h"
+#include "beget_ext.h"
+
+#include <stdlib.h>
+
+int HashTabCreate(HashTab **tab, HashInfo *info)
+{
+ BEGET_ERROR_CHECK(tab != NULL && info != NULL, return -1, "%s : invalid arguments", __func__);
+ *tab = (HashTab*)calloc(1, sizeof(HashTab) + sizeof(HashNode) * info->maxBucket);
+ BEGET_ERROR_CHECK(*tab != NULL, return -1, "fail to calloc hash tab");
+ (*tab)->nodeHash = info->nodeHash;
+ (*tab)->keyHash = info->keyHash;
+ (*tab)->nodeCompare = info->nodeCompare;
+ (*tab)->keyCompare = info->keyCompare;
+ (*tab)->nodeFree = info->nodeFree;
+ (*tab)->maxBucket = info->maxBucket;
+ return 0;
+}
+
+static HashNode* CheckHashNodeIsExist(HashTab *tab, HashNode* head, HashNode *target)
+{
+ int ret;
+ HashNode *tmp = head;
+ while (tmp) {
+ ret = tab->nodeCompare(tmp, target);
+ if (ret == 0) {
+ return tmp;
+ }
+ tmp = tmp->next;
+ }
+ return NULL;
+}
+
+int HashNodeAdd(HashTab *tab, HashNode *node)
+{
+ BEGET_ERROR_CHECK(tab != NULL && node != NULL, return -1, "%s : invalid param", __func__);
+ int hashCode = tab->nodeHash(node);
+ hashCode = hashCode > 0 ? hashCode : -hashCode;
+ hashCode = hashCode % tab->maxBucket;
+ HashNode *tmp = CheckHashNodeIsExist(tab, tab->buckets[hashCode], node);
+ if (tmp != NULL) {
+ BEGET_LOGE("node was exist");
+ return -1;
+ }
+ node->next = tab->buckets[hashCode];
+ tab->buckets[hashCode] = node;
+ return 0;
+}
+
+void HashNodeRemove(HashTab *tab, HashNode *node)
+{
+ BEGET_ERROR_CHECK(tab != NULL && node != NULL, return, "%s : invalid param", __func__);
+ int hashCode = tab->nodeHash(node);
+ hashCode = hashCode > 0 ? hashCode : -hashCode;
+ hashCode = hashCode % tab->maxBucket;
+ HashNode *current = tab->buckets[hashCode];
+ HashNode *prepare = NULL;
+
+ while (current) {
+ int ret = tab->nodeCompare(current, node);
+ if (ret == 0) {
+ if (current == tab->buckets[hashCode]) {
+ tab->buckets[hashCode] = current->next;
+ } else {
+ prepare->next = current->next;
+ }
+ return;
+ }
+ prepare = current;
+ current = current->next;
+ }
+}
+
+int HashTabDestroy(HashTab *tab)
+{
+ BEGET_ERROR_CHECK(tab != NULL, return -1, "%s : invalid arguments", __func__);
+ if (tab->nodeFree == NULL) {
+ BEGET_LOGE("%s : can not find node free func", __func__);
+ return -1;
+ }
+ for (int i = 0; i < tab->maxBucket; ++i) {
+ while(tab->buckets[i]) {
+ HashNode *next = tab->buckets[i]->next;
+ tab->nodeFree(tab->buckets[i]);
+ tab->buckets[i] = next;
+ }
+ }
+ return 0;
+}
+
+HashNode* GetHashNode(HashTab *tab, const void *key)
+{
+ BEGET_ERROR_CHECK(tab != NULL && key != NULL, return NULL, "%s : invalid param", __func__);
+ int hashCode = tab->keyHash(key);
+ hashCode = hashCode > 0 ? hashCode : -hashCode;
+ hashCode = hashCode % tab->maxBucket;
+ HashNode *tmp = tab->buckets[hashCode];
+ while (tmp != NULL) {
+ int ret = tab->keyCompare(tmp, key);
+ if (ret == 0) {
+ return tmp;
+ }
+ tmp = tmp->next;
+ }
+ return NULL;
+}
\ No newline at end of file
diff --git a/services/param_service/src/le_utils.c b/services/param_service/src/le_utils.c
index 8fa0401..c4f5b69 100644
--- a/services/param_service/src/le_utils.c
+++ b/services/param_service/src/le_utils.c
@@ -3,8 +3,6 @@
#include <fcntl.h>
#include <unistd.h>
#include <string.h>
-#include <sys/stat.h>
-#include "securec.h"
#define MAX_BUF 1024
diff --git a/services/param_service/src/main.c b/services/param_service/src/main.c
index 1daa246..58ef853 100644
--- a/services/param_service/src/main.c
+++ b/services/param_service/src/main.c
@@ -6,8 +6,7 @@
#include <sys/prctl.h>
#include <string.h>
#include "beget_ext.h"
-#include "trie_comm.h"
-#include "param_server.h"
+#include "base_task.h";
int main(int argc, char* argv[])
{
@@ -17,11 +16,18 @@ int main(int argc, char* argv[])
return -1;
}
- int ret = ParamWorkSpaceInit();
+ EventLoop *defaultLoop = GetDefaultLoop();
+ int ret = ParamServerInit(defaultLoop);
+ if (ret != 0) {
+ BEGET_LOGE("ParamServerInit failed\n");
+ }
+
+ ret = ParamWorkSpaceInit();
if (ret != 0) {
BEGET_LOGE("ParamWorkSpaceInit failed\n");
exit(EXIT_FAILURE);
}
- ParamServerStart();
+
+ RunLoop(defaultLoop);
return 0;
}
diff --git a/services/param_service/src/param_server.c b/services/param_service/src/param_server.c
index 8b38d0f..9ae06a3 100644
--- a/services/param_service/src/param_server.c
+++ b/services/param_service/src/param_server.c
@@ -3,13 +3,13 @@
#include <sys/un.h>
#include <sys/stat.h>
#include <sys/epoll.h>
+#include <sys/timerfd.h>
#include <errno.h>
#include <unistd.h>
#include <stddef.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
-#include <pthread.h>
#include "beget_ext.h"
#include "param_server.h"
@@ -19,153 +19,445 @@
#include "securec.h"
#include "le_utils.h"
-void HandleEvent(struct EventArgs* args)
-{
- int clientFd = args->clientFd;
- int epollFd = args->epollFd;
- struct ParamRequestMsg* pmsg = (struct ParamRequestMsg*)malloc(sizeof(struct ParamRequestMsg) + PARAM_VALUE_LEN_MAX);
- BEGET_ERROR_CHECK(pmsg != NULL, return, "failed to malloc ParamRequestMsg");
- bzero(pmsg, sizeof(struct ParamRequestMsg) + PARAM_VALUE_LEN_MAX);
- pmsg->datasize = PARAM_VALUE_LEN_MAX;
- int status = SOCK_CONNECTED;
- while (1) {
- int ret = recv(clientFd, pmsg, sizeof(struct ParamRequestMsg) + pmsg->datasize, 0);
- if (ret == 0) {
- status = SOCK_DISCONNECTED;
- break;
- } else if (ret < 0) {
- if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN) {
- continue;
- }
- status = SOCK_UNKNOWN;
- break;
- } else {
- break;
- }
+static int ParamTimerCreate(EventLoop *loop);
+struct ParamTimer {
+ int8_t isCreate;
+ BaseTask *task;
+};
+
+static struct ParamTimer ptimer = {0, NULL};
+
+static ListNode* GetAwaitHead()
+{
+ static ListNode *awaitHead = NULL;
+ if (awaitHead == NULL) {
+ ListNode *head = (ListNode*)malloc(sizeof(ListNode));
+ BEGET_ERROR_CHECK(head != NULL, return NULL, "%s, failed to allocate space", __func__);
+ head->next = head;
+ head->prev = head;
+ awaitHead = head;
}
+ return awaitHead;
+}
- if (status != SOCK_CONNECTED) {
- epoll_ctl(epollFd, EPOLL_CTL_DEL, clientFd, NULL);
- free(pmsg);
+static void CloseTask(LoopHandle handle, BaseTask *task)
+{
+ if (handle == NULL || task == NULL) {
return;
}
+ EventLoop *loop = (EventLoop*)handle;
+ loop->DelEvent(loop, task);
+ close(task->taskId);
+ HashNodeRemove(loop->tab, &task->hashNode);
+ free(task);
+}
+
+int32_t SocketRecv(LoopHandle handle, BaseTask *task, EventBuffer *buf, size_t length)
+{
+ BEGET_ERROR_CHECK(handle != NULL && task != NULL && buf != NULL, return -1, "%s : invalid param", __func__);
+ int32_t readlen = (int32_t)recv(task->taskId, buf->data + buf->datasize, length, 0);
+ if (readlen > 0) {
+ buf->datasize += readlen;
+ }
+ return readlen;
+}
- int ret;
- struct ParamRespMsg* respmsg = (struct ParamRespMsg*)malloc(sizeof(struct ParamRespMsg) + PARAM_VALUE_LEN_MAX);
- BEGET_ERROR_CHECK(respmsg != NULL, free(pmsg);return, "Failed to malloc ParamRespMsg");
- bzero(respmsg, sizeof(struct ParamRespMsg) + PARAM_VALUE_LEN_MAX);
- switch(pmsg->type) {
+int32_t SocketSend(LoopHandle handle, BaseTask *task, EventBuffer *buf)
+{
+ BEGET_ERROR_CHECK(handle != NULL && task != NULL && buf != NULL, return -1, "%s : invalid param", __func__);
+ int32_t writelen = (int32_t)send(task->taskId, buf->data, buf->datasize, 0);
+ return writelen;
+}
+
+static void CheckAndTriggerWait(LoopHandle handle, char *key, char *value)
+{
+ BEGET_ERROR_CHECK(handle != NULL && key != NULL && value != NULL, return, "%s, invalid value", __func__);
+ EventLoop *loop = (EventLoop*)handle;
+ char fullStr[PARAM_NAME_LEN_MAX + PARAM_VALUE_LEN_MAX] = {0};
+ sprintf(fullStr, "%s=%s", key, value);
+ ListNode *head = GetAwaitHead();
+ ListNode *tmp = head->next;
+ BEGET_ERROR_CHECK(head != NULL, return, "%s, invalid list node", __func__);
+ while (tmp != head) {
+ WaitInfo *info = ListEntry(tmp, WaitInfo, anchor);
+ if (strcmp(info->condition, fullStr) == 0) {
+ HashNode *hashNode = GetHashNode(loop->tab, &info->taskId);
+ BaseTask *task = HASHNODE_ENTRY(hashNode, BaseTask, hashNode);
+ loop->AddEvent(loop, task, Event_Write);
+ OH_ListRemove(&info->anchor);
+ }
+ tmp = tmp->next;
+ }
+}
+
+static void HandleMessageInner_(LoopHandle handle, ClientTask *task, ParamReqMsg *msg)
+{
+ BEGET_ERROR_CHECK(handle != NULL && task != NULL && msg != NULL, return, "%s : invalid param", __func__);
+ EventLoop *loop = (EventLoop*)handle;
+ uint32_t flag;
+ char data[PARAM_VALUE_LEN_MAX] = {0};
+ switch(msg->type) {
case SET_PARAMETER: {
- ret = SetParamtoMem(pmsg->key, pmsg->data);
- respmsg->flag = ret;
+ task->type = NORMAL_TYPE;
+ flag = SetParamtoMem(msg->key, msg->data);
+ if (flag == 0) {
+ CheckAndTriggerWait(handle, msg->key, msg->data);
+ }
break;
}
case GET_PARAMETER: {
- if (pmsg->datasize > PARAM_VALUE_LEN_MAX) {
- pmsg->datasize = PARAM_VALUE_LEN_MAX;
- }
- ret = GetParamFromMem(pmsg->key, respmsg->data, pmsg->datasize);
- respmsg->flag = ret;
- if (ret == 0) {
- respmsg->datasize = strlen(respmsg->data);
- }
+ task->type = NORMAL_TYPE;
+ flag = GetParamFromMem(msg->key, data, PARAM_VALUE_LEN_MAX);
break;
}
case WAIT_PARAMETER: {
- ret = WaitParam(pmsg->key, pmsg->data, pmsg->timeout);
- respmsg->flag = ret;
+ task->type = NORMAL_TYPE;
+ flag = WaitParam(msg->key, msg->data, msg->timeout);
+ if (flag != 0) {
+ task->type = WAIT_TYPE;
+ WaitInfo *info = (WaitInfo*)malloc(sizeof(WaitInfo) + msg->datasize);
+ BEGET_ERROR_CHECK(info != NULL, break, "%s, failed to allocate wait info space", __func__);
+ sprintf(info->condition, "%s=%s", msg->key, msg->data);
+ info->timeout = msg->timeout;
+ info->taskId = task->base.taskId;
+ task->info.extraInfo = (void*)info;
+ OH_ListAddTail(GetAwaitHead(), &info->anchor);
+ loop->DelEvent(loop, (BaseTask*)task);
+ ParamTimerCreate(loop);
+ return;
+ }
break;
}
default:
- respmsg->flag = -1;
+ task->type = NORMAL_TYPE;
+ flag = -1;
break;
}
- ret = send(clientFd, respmsg, sizeof(struct ParamRespMsg) + PARAM_VALUE_LEN_MAX, 0);
- if (ret < 0) {
- BEGET_LOGE("Failed to send data to : %d\n", clientFd);
+ EventBuffer *buf;
+ if (strlen(data) > 0) {
+ buf = (EventBuffer*)calloc(1, sizeof(EventBuffer) + sizeof(ParamRespMsg) + strlen(data) + 1);
+ BEGET_ERROR_CHECK(buf != NULL, return, "%s, failed to allocate buf", __func__);
+ buf->datasize = sizeof(ParamRespMsg) + strlen(data) + 1;
+ ParamRespMsg *respmsg = (ParamRespMsg*)(buf->data);
+ respmsg->flag = flag;
+ respmsg->datasize = strlen(data);
+ (void)memcpy_s(respmsg->data, strlen(data) + 1, data, strlen(data) + 1);
+ } else {
+ buf = (EventBuffer*)calloc(1, sizeof(EventBuffer) + sizeof(ParamRespMsg));
+ BEGET_ERROR_CHECK(buf != NULL, return, "%s, failed to allocate buf", __func__);
+ buf->datasize = sizeof(ParamRespMsg);
+ ParamRespMsg *respmsg = buf->data;
+ respmsg->flag = flag;
}
- free(pmsg);
- free(respmsg);
+
+ task->info.content = buf;
+ loop->ModEvent(loop, (BaseTask*)task, Event_Write);
}
-int CtlAdd(int epollfd, int fd, uint32_t event)
+static void OnSendMessage(LoopHandle handle, BaseTask *task)
{
- struct epoll_event ev = {
- .data.fd = fd,
- .events = event,
- };
- int ret = epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev);
- BEGET_ERROR_CHECK(ret == 0, return -1, "failed to add epoll_ctl fd %d. errno [%d]", fd, errno);
- return 0;
+ BEGET_ERROR_CHECK(handle != NULL && task != NULL, return, "%s : invalid param", __func__);
+ EventLoop *loop = (EventLoop*)handle;
+ ClientTask *clienttask = (ClientTask*)task;
+
+ EventBuffer *buf = NULL;
+ if (clienttask->type == NORMAL_TYPE) {
+ BEGET_ERROR_CHECK(clienttask->info.content != NULL, return, "no message to send");
+ buf = clienttask->info.content;
+ } else if (clienttask->type == WAIT_TYPE) {
+ BEGET_ERROR_CHECK(clienttask->info.extraInfo != NULL, return, "no message to send");
+ WaitInfo *info = (WaitInfo*)clienttask->info.extraInfo;
+ buf = (EventBuffer*)calloc(1, sizeof(EventBuffer) + sizeof(ParamRespMsg));
+ BEGET_ERROR_CHECK(buf != NULL, return, "%s, failed to allocate buf", __func__);
+ buf->datasize = sizeof(ParamRespMsg);
+ ParamRespMsg *respmsg = buf->data;
+ respmsg->datasize = 0;
+ if (info->timeout > 0) {
+ respmsg->flag = 0;
+ } else {
+ respmsg->flag = -1;
+ }
+ free(info);
+ }
+
+ int32_t ret = SocketSend(handle, task, buf);
+ if (ret < 0) {
+ if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN) {
+ BEGET_LOGE("%s, resource busy, try again", __func__);
+ return;
+ }
+ BEGET_LOGE("%s, SocketSend fail, errno : %d", __func__, errno);
+ }
+
+ free(buf);
+ loop->ModEvent(loop, task, Event_Read);
}
-void StartEpoll(int listenfd)
-{
- int epollfd = epoll_create(LOOP_MAX_SOCKET);
- BEGET_ERROR_CHECK(epollfd > 0, return, "failed to create epoll. errno [%d]", errno);
-
- int ret = CtlAdd(epollfd, listenfd, EPOLLIN);
- BEGET_ERROR_CHECK(ret == 0, close(epollfd); return, "failed to CtlAdd");
-
- struct epoll_event *events = (struct epoll_event*)malloc(sizeof(struct epoll_event) * LOOP_MAX_SOCKET);
- BEGET_ERROR_CHECK(events != NULL, close(epollfd); return, "failed to alloc memory for epoll_event");
-
- while(1) {
- int number = epoll_wait(epollfd, events, LOOP_MAX_SOCKET, -1);
- for (int index = 0; index < number; ++index) {
- int fd_ = events[index].data.fd;
- if (fd_ == listenfd) {
- struct sockaddr_un clientAddr;
- socklen_t addrlen = sizeof(clientAddr);
- bzero(&clientAddr, addrlen);
- int clientfd = accept(listenfd, (struct sockaddr*)&clientAddr, &addrlen);
- BEGET_ERROR_CHECK(clientfd >= 0, close(epollfd); return, "failed to accept socket");
- SetNoBlock(clientfd);
- SetCloseExec(clientfd);
- ret = CtlAdd(epollfd, clientfd, EPOLLIN | EPOLLET | EPOLLONESHOT);
- BEGET_ERROR_CHECK(ret == 0, continue, "failed to CtlAdd");
- } else {
- pthread_t threadId;
- struct EventArgs args = {epollfd, fd_};
- ret = pthread_create(&threadId, NULL, (void*)HandleEvent, (void*)&args);
- BEGET_ERROR_CHECK(ret == 0, continue, "faild to create pthread to handle parameter event");
+static void OnRecvMessage(LoopHandle handle, BaseTask *task)
+{
+ BEGET_ERROR_CHECK(handle != NULL && task != NULL, return, "%s : invalid param", __func__);
+ EventLoop *loop = (EventLoop*)handle;
+ uint32_t payload = (uint32_t)sizeof(ParamReqMsg);
+ EventBuffer *buf = (EventBuffer*)calloc(1, sizeof(EventBuffer) + payload);
+ BEGET_ERROR_CHECK(buf != NULL, return, "%s, fail to allocate recv buf", __func__);
+
+ int32_t recvlen = payload;
+ while (buf->datasize != payload) {
+ int32_t ret = SocketRecv(handle, task, buf, recvlen);
+ if (ret < 0) {
+ if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN) {
+ continue;
+ }
+ BEGET_LOGE("Process SocketRecv fail, errno : ", errno);
+ goto CLOSE;
+ } else if (ret == 0) {
+ BEGET_LOGI("%d, client normal exist", task->taskId);
+ task->close(handle, task);
+ goto CLOSE;
+ }
+ recvlen = payload - buf->datasize;
+ }
+
+ ParamReqMsg *reqmsg = buf->data;
+ if (reqmsg->datasize > 0) {
+ EventBuffer *tmp = (EventBuffer*)calloc(1, sizeof(EventBuffer) + payload + reqmsg->datasize);
+ (void)memcpy_s(tmp, payload, buf, payload);
+ free(buf);
+ buf = tmp;
+ reqmsg = buf->data;
+ recvlen = reqmsg->datasize;
+ payload += reqmsg->datasize;
+ while (buf->datasize != payload) {
+ int32_t ret = SocketRecv(handle, task, buf, recvlen);
+ if (ret < 0) {
+ if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN) {
+ continue;
+ }
+ BEGET_LOGE("Process SocketRecv fail, errno : ", errno);
+ goto CLOSE;
+ } else if (ret == 0) {
+ BEGET_LOGI("%d, client normal exist", task->taskId);
+ task->close(handle, task);
+ goto CLOSE;
}
+ recvlen = payload - buf->datasize;
}
}
- close(epollfd);
- free(events);
+
+ HandleMessageInner_(handle, (ClientTask*)task, reqmsg);
+CLOSE:
+ free(buf);
}
-int CreateSocket()
+static void HandleClientEvent(LoopHandle handle, BaseTask *task, EventType type)
{
- unlink(PIPE_NAME);
- int listenfd = socket(PF_UNIX, SOCK_STREAM, 0);
- BEGET_ERROR_CHECK(listenfd > 0, return -1, "failed to create socket. errno [%d]", errno);
+ BEGET_ERROR_CHECK(handle != NULL && task != NULL, return, "%s : invalid param", __func__);
+ ClientTask *clienttask = (ClientTask*)task;
+ if (type == Event_Read) {
+ clienttask->recvMessage(handle ,task);
+ } else if (type == Event_Write) {
+ clienttask->sendMessage(handle ,task);
+ } else {
+ BEGET_LOGE("%s, invalid type", __func__);
+ }
+}
+
+static void ServerInCommingConnect(LoopHandle handle, BaseTask *task)
+{
+ BEGET_ERROR_CHECK(handle != NULL && task != NULL, return, "%s : invalid param", __func__);
+ EventLoop *loop = (EventLoop*)handle;
+
+ struct sockaddr_un clientAddr;
+ socklen_t addrlen = sizeof(clientAddr);
+ int clientfd = accept(task->taskId, (struct sockaddr*)&clientAddr, &addrlen);
+ BEGET_ERROR_CHECK(clientfd >= 0, return, "%s : failed to accept socket, %d", __func__, errno);
+ BEGET_LOGV("client fd = %d", clientfd);
+ SetNoBlock(clientfd);
+ SetCloseExec(clientfd);
+ ClientTask *clienttask = (ClientTask*)CreateBaseTask(loop, sizeof(ClientTask));
+ BEGET_ERROR_CHECK(clienttask != NULL, close(clientfd); return, "%s : failed to create client task. errno [%d]", __func__, errno);
+ clienttask->base.taskId = clientfd;
+ clienttask->base.close = CloseTask;
+ clienttask->base.handleEvent = HandleClientEvent;
+ clienttask->recvMessage = OnRecvMessage;
+ clienttask->sendMessage = OnSendMessage;
+ clienttask->disconnect = NULL;
+ HashNodeAdd(loop->tab, &clienttask->base.hashNode);
+ loop->AddEvent(loop, (BaseTask*)clienttask, Event_Read);
+}
+
+static void HandleServerEvent(LoopHandle handle, BaseTask *task, EventType type)
+{
+ (void)type;
+ BEGET_ERROR_CHECK(handle != NULL && task != NULL, return, "%s : invalid param", __func__);
+ ServerTask *servertask = (ServerTask*)task;
+ servertask->incommingConnect(handle, task);
+}
+
+static void CheckWaitParamTimeout(LoopHandle handle, uint64_t expire)
+{
+ BEGET_ERROR_CHECK(handle != NULL, return, "%s, invalid valud", __func__);
+ EventLoop *loop = (EventLoop*)handle;
+ ListNode *head = GetAwaitHead();
+ ListNode *tmp = head->next;
+ BEGET_ERROR_CHECK(head != NULL, return, "%s, invalid list node", __func__);
+ while (tmp != head) {
+ WaitInfo *info = ListEntry(tmp, WaitInfo, anchor);
+ if (info->timeout > 0) {
+ info->timeout -= expire;
+ } else {
+ HashNode *hashNode = GetHashNode(loop->tab, &info->taskId);
+ BaseTask *task = HASHNODE_ENTRY(hashNode, BaseTask, hashNode);
+ loop->AddEvent(loop, task, Event_Write);
+ OH_ListRemove(&info->anchor);
+ }
+ tmp = tmp->next;
+ }
+ if (head->next == head) {
+ ptimer.task->close(handle, ptimer.task);
+ ptimer.isCreate = 0;
+ }
+}
+static void HandleTimerEvent(LoopHandle handle, BaseTask *task, EventType type)
+{
+ (void)type;
+ uint64_t exp;
+ read(task->taskId, &exp, sizeof(uint64_t));
+ CheckWaitParamTimeout(handle, exp);
+ BEGET_LOGI("Entry timer task, exp : %ld", exp);
+}
+
+static int ParamServerCreate(EventLoop *loop)
+{
+ BEGET_ERROR_CHECK(loop != NULL && loop->tab != NULL, return -1, "%s : invalid loop", __func__);
+ int server = socket(PF_UNIX, SOCK_STREAM, 0);
+ BEGET_ERROR_CHECK(server > 0, return -1, "failed to create socket. errno [%d]", errno);
struct sockaddr_un serverAddr;
-
(void)memset_s(&serverAddr, sizeof(serverAddr), 0, sizeof(serverAddr));
serverAddr.sun_family = AF_UNIX;
strncpy(serverAddr.sun_path, PIPE_NAME, sizeof(serverAddr.sun_path));
uint32_t size = offsetof(struct sockaddr_un, sun_path) + strlen(PIPE_NAME);
- int ret = bind(listenfd, (struct sockaddr*)&serverAddr, size);
- BEGET_ERROR_CHECK(ret >= 0, close(listenfd); return -1, "failed to bind socket. errno [%d]", errno);
+ int ret = bind(server, (struct sockaddr*)&serverAddr, size);
+ BEGET_ERROR_CHECK(ret >= 0, close(server); return -1, "failed to bind socket. errno [%d]", errno);
+
+ SetNoBlock(server);
+ SetCloseExec(server);
+ ret = listen(server, MAX_CLIENT);
+ BEGET_ERROR_CHECK(ret >= 0, close(server); return -1, "failed to listen socket. errno [%d]", errno);
+
+ ServerTask *servertask = (ServerTask*)CreateBaseTask(loop, sizeof(ServerTask));
+ BEGET_ERROR_CHECK(servertask != NULL, close(server); return -1, "failed to create server task. errno [%d]", errno);
+ servertask->base.taskId = server;
+ servertask->base.close = CloseTask;
+ servertask->base.handleEvent = HandleServerEvent;
+ servertask->incommingConnect = ServerInCommingConnect;
+ HashNodeAdd(loop->tab, &servertask->base.hashNode);
+ loop->AddEvent(loop, (BaseTask*)servertask, Event_Read);
+ return 0;
+}
+
+static int ParamTimerCreate(EventLoop *loop)
+{
+ if (ptimer.isCreate) {
+ return 0;
+ }
+ BEGET_ERROR_CHECK(loop != NULL && loop->tab != NULL, return -1, "%s : invalid loop", __func__);
+ int timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC);
+ BEGET_ERROR_CHECK(timerfd > 0, return -1, "failed to create timerfd. errno [%d]", errno);
+ struct itimerspec timespec = {
+ .it_interval.tv_sec = 1,
+ .it_interval.tv_nsec = 0,
+ .it_value.tv_sec = 1,
+ .it_value.tv_nsec = 0,
+ };
+ int ret = timerfd_settime(timerfd, 0, &timespec, NULL);
+ BEGET_ERROR_CHECK(ret == 0, return -1, "failed to set timerfd. errno [%d]", errno);
+ ptimer.task = (BaseTask*)CreateBaseTask(loop, sizeof(BaseTask));
+ BEGET_ERROR_CHECK(ptimer.task != NULL, close(timerfd); return -1, "failed to create timer task. errno [%d]", errno);
+ ptimer.task->taskId = timerfd;
+ ptimer.task->handleEvent = HandleTimerEvent;
+ ptimer.task->close = CloseTask;
+ HashNodeAdd(loop->tab, &ptimer.task->hashNode);
+ loop->AddEvent(loop, ptimer.task, Event_Read);
+ ptimer.isCreate = 1;
+ return 0;
+}
+
+static int NodeHash(HashNode *node)
+{
+ BaseTask *task = HASHNODE_ENTRY(node, BaseTask, hashNode);
+ return task->taskId;
+}
+
+static int KeyHash(const void *key)
+{
+ int id = *(int*)key;
+ return id;
+}
+
+static int NodeCompare(HashNode *node_1, HashNode *node_2)
+{
+ BaseTask *task_1 = HASHNODE_ENTRY(node_1, BaseTask, hashNode);
+ BaseTask *task_2 = HASHNODE_ENTRY(node_2, BaseTask, hashNode);
+ return (task_1->taskId - task_2->taskId);
+}
- SetNoBlock(listenfd);
- SetCloseExec(listenfd);
- ret = listen(listenfd, LOOP_MAX_CLIENT);
- BEGET_ERROR_CHECK(ret >= 0, close(listenfd); return -1, "failed to listen socket. errno [%d]", errno);
- ret = chmod(PIPE_NAME, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
- BEGET_ERROR_CHECK(ret == 0, close(listenfd); return -1, "failed to chmod %s. errno [%d]", PIPE_NAME, errno);
+static int KeyCompare(HashNode *node, const void *key)
+{
+ BaseTask *task = HASHNODE_ENTRY(node, BaseTask, hashNode);
+ if (task == NULL) {
+ BEGET_LOGE("%s, invalid task", __func__);
+ return -1;
+ }
+ return (task->taskId - *((int*)key));
+}
- return listenfd;
+static void NodeFree(HashNode *node)
+{
+ BaseTask *task = HASHNODE_ENTRY(node, BaseTask, hashNode);
+ if (task->close != NULL) {
+ task->close(GetDefaultLoop(), task);
+ }
+ free(task);
}
-void ParamServerStart()
+static void ResourceInit()
{
+ unlink(PIPE_NAME);
+ chmod(PIPE_NAME, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
MakeDirRecursive(PIPE_PATH, S_IRWXU | S_IRWXU | S_IRWXU | S_IROTH | S_IXOTH);
- int listenfd = CreateSocket();
- StartEpoll(listenfd);
+}
+
+static int InitDefaultLoopTab(EventLoop *defaultLoop)
+{
+ BEGET_ERROR_CHECK(defaultLoop != NULL, return -1, "%s : invalid loop", __func__);
+ HashInfo info = {
+ .nodeHash = NodeHash,
+ .keyHash = KeyHash,
+ .nodeCompare = NodeCompare,
+ .keyCompare = KeyCompare,
+ .nodeFree = NodeFree,
+ .maxBucket = 128,
+ };
+ HashTab *tab = NULL;
+ int ret = HashTabCreate(&tab, &info);
+ BEGET_ERROR_CHECK(ret == 0, return -1, "failed to create hash tab. errno [%d]", ret);
+ defaultLoop->tab = tab;
+ return 0;
+}
+
+int ParamServerInit(EventLoop *defaultLoop)
+{
+ BEGET_ERROR_CHECK(defaultLoop != NULL, return -1, "%s, invalid event loop", __func__);
+ ResourceInit();
+
+ int ret;
+ ret = InitDefaultLoopTab(defaultLoop);
+ BEGET_ERROR_CHECK(ret == 0, return -1, "%s : failed to init loop", __func__);
+ ret = ParamServerCreate(defaultLoop);
+ BEGET_ERROR_CHECK(ret == 0, return -1, "%s : failed to create param server.", __func__);
+ return 0;
}
diff --git a/services/param_service/src/trie_comm.c b/services/param_service/src/trie_comm.c
index fd8184a..ac31243 100644
--- a/services/param_service/src/trie_comm.c
+++ b/services/param_service/src/trie_comm.c
@@ -14,20 +14,23 @@
#include <sys/mman.h>
#include <time.h>
#include <signal.h>
+#include <semaphore.h>
#include "trie_comm.h"
+#include "trie_queue.h"
#include "le_utils.h"
#include "param_utils.h"
#include "parameter.h"
#include "securec.h"
static TrieHeader* paramWorkSpace;
+static TrieNodeQueue trieQueue = {0};
+static sem_t dump_sem;
static pthread_rwlock_t rwlock;
-static pthread_mutex_t mtlock;
-static atomic_bool cnt;
-static atomic_bool waitCnt;
+static pthread_mutex_t queuelock;
+static atomic_int updateCnt = 0;
-uint32_t trie_alloc(char* name)
+static uint32_t trie_alloc(char* name)
{
BEGET_ERROR_CHECK(name != NULL, return 0, "invalid name");
uint32_t keySize = strlen(name) + 1;
@@ -48,7 +51,7 @@ uint32_t trie_alloc(char* name)
return nowOffset;
}
-uint32_t param_alloc(uint32_t size)
+static uint32_t param_alloc(uint32_t size)
{
uint32_t allocSize = PARAM_ALIGN(sizeof(ParamNode) + size);
uint32_t nowOffset = paramWorkSpace->currOffest;
@@ -61,41 +64,41 @@ uint32_t param_alloc(uint32_t size)
return nowOffset;
}
-TrieNode* GetRootNode()
+static TrieNode* GetRootNode()
{
BEGET_ERROR_CHECK((paramWorkSpace != NULL) && (paramWorkSpace->shareAddr != NULL), return NULL, "failed");
return (paramWorkSpace->shareAddr + paramWorkSpace->rootOffest);
}
-TrieNode* GetTrieEntry(uint32_t index)
+static TrieNode* GetTrieEntry(uint32_t index)
{
BEGET_ERROR_CHECK(index <= paramWorkSpace->currOffest, return NULL, "invalid index");
TrieNode* entry = paramWorkSpace->shareAddr + index;
return entry;
}
-ParamNode* GetParamEntry(uint32_t index)
+static ParamNode* GetParamEntry(uint32_t index)
{
BEGET_ERROR_CHECK(index <= paramWorkSpace->currOffest, return NULL, "invalid index");
ParamNode* entry = paramWorkSpace->shareAddr + index;
return entry;
}
-ListNode* GetListNodeEntry(uint32_t index)
+static TrieListNode* GetTrieListNodeEntry(uint32_t index)
{
BEGET_ERROR_CHECK(index <= paramWorkSpace->currOffest, return NULL, "invalid index");
- ListNode* entry = paramWorkSpace->shareAddr + index;;
+ TrieListNode* entry = paramWorkSpace->shareAddr + index;;
return entry;
}
-TrieNode* ListNodeGetTrieEntry(ListNode* node)
+static TrieNode* TrieListNodeGetTrieEntry(TrieListNode* node)
{
BEGET_ERROR_CHECK(node != NULL, return NULL, "invalid node");
TrieNode* entry = (TrieNode*)((char*)node - offsetof(TrieNode, node));
return entry;
}
-void GetSubKey(const char* remainKey, char** subKey, uint32_t* prefixLen)
+static void GetSubKey(const char* remainKey, char** subKey, uint32_t* prefixLen)
{
BEGET_ERROR_CHECK(remainKey != NULL, return, "invalid remainKey");
BEGET_ERROR_CHECK(subKey != NULL, return, "invalid subKey");
@@ -107,7 +110,7 @@ void GetSubKey(const char* remainKey, char** subKey, uint32_t* prefixLen)
}
}
-int CompareKey(const char* nodeKey, const char* prefixKey, uint32_t prefixKeyLen)
+static int CompareKey(const char* nodeKey, const char* prefixKey, uint32_t prefixKeyLen)
{
uint32_t nodeKeyLen = strlen(nodeKey);
if (nodeKeyLen > prefixKeyLen) {
@@ -118,7 +121,7 @@ int CompareKey(const char* nodeKey, const char* prefixKey, uint32_t prefixKeyLen
return strncmp(nodeKey, prefixKey, prefixKeyLen);
}
-TrieNode* FindSubTrieNode(TrieNode* current, const char* remainKey, uint32_t prefixLen)
+static TrieNode* FindSubTrieNode(TrieNode* current, const char* remainKey, uint32_t prefixLen)
{
if (current == NULL || remainKey == NULL)
return NULL;
@@ -138,7 +141,7 @@ TrieNode* FindSubTrieNode(TrieNode* current, const char* remainKey, uint32_t pre
return FindSubTrieNode(subTrieNode, remainKey, prefixLen);
}
-TrieNode* AddSubTrieNode(TrieNode* current, const char* remainKey, uint32_t prefixLen)
+static TrieNode* AddSubTrieNode(TrieNode* current, const char* remainKey, uint32_t prefixLen)
{
if (current == NULL || remainKey == NULL)
return NULL;
@@ -170,10 +173,10 @@ TrieNode* AddSubTrieNode(TrieNode* current, const char* remainKey, uint32_t pref
return AddSubTrieNode(subTrieNode, remainKey, prefixLen);
}
-int CheckParamName(const char* name)
+static int CheckParamName(const char* name)
{
BEGET_ERROR_CHECK(name != NULL, return -1, "invalid parameter name");
- size_t nameLen = strlen(name);
+ int nameLen = (int)strlen(name);
if (name[0] == '.' || name[nameLen - 1] == '.')
return -1;
for (int i = 0; i < nameLen; ++i) {
@@ -191,145 +194,7 @@ int CheckParamName(const char* name)
return 0;
}
-int SetParamtoMem(const char* key, const char* value)
-{
- BEGET_ERROR_CHECK((paramWorkSpace != NULL) && (paramWorkSpace->shareAddr != NULL), return -1, "invalid paramWorkSpace");
- BEGET_ERROR_CHECK((key != NULL ) && (value != NULL), return -1, "invalid key or value");
- BEGET_ERROR_CHECK((strlen(key) > 0) && (strlen(key) <= PARAM_NAME_LEN_MAX), return -1, "invalid key len");
- BEGET_ERROR_CHECK((strlen(value) > 0) && (strlen(value) <= PARAM_VALUE_LEN_MAX), return -1, "invalid value len");
- BEGET_ERROR_CHECK(CheckParamName(key) == 0, return -1, "invalid parameter name");
-
- TrieNode* root = GetRootNode();
- TrieNode* current = GetRootNode();
- if (root == NULL || current == NULL)
- return -1;
-
- char* remainKey = (char *)key;
- pthread_rwlock_wrlock(&rwlock);
- while(1) {
- char* subKey;
- uint32_t prefixLen;
- GetSubKey(remainKey, &subKey, &prefixLen);
- if (current->child != 0) {
- current = AddSubTrieNode(GetTrieEntry(current->child), remainKey, prefixLen);
- BEGET_ERROR_CHECK(current != NULL, pthread_rwlock_unlock(&rwlock); return -1, "can not AddSubTrieNode");
- } else {
- char prefixKey[PARAM_NAME_LEN_MAX] = {0};
- (void)memcpy_s(prefixKey, PARAM_NAME_LEN_MAX, remainKey, prefixLen);
- current->child = trie_alloc(prefixKey);
- BEGET_ERROR_CHECK(current->child != 0, pthread_rwlock_unlock(&rwlock); return -1, "can not alloc tire node");
- current = GetTrieEntry(current->child);
- BEGET_ERROR_CHECK(current != NULL, pthread_rwlock_unlock(&rwlock); return -1, "can not get trie entry");
- }
- if (subKey == NULL) {
- if (current->dataIndex) {
- int ret = strncmp(key, CONST_PREFIX, strlen(CONST_PREFIX)) ;
- BEGET_ERROR_CHECK(ret != 0, pthread_rwlock_unlock(&rwlock); return -1, "can not change the value of a constant parameter");
- ParamNode* saveParam = GetParamEntry(current->dataIndex);
- BEGET_ERROR_CHECK(saveParam != NULL, pthread_rwlock_unlock(&rwlock); return -1, "can not get param entry");
- (void)memcpy_s(saveParam->data + saveParam->keyLen + 1, PARAM_VALUE_LEN_MAX, value, strlen(value));
- saveParam->valueLen = strlen(value);
- break;
- }
- uint32_t allocSize = strlen(key) + PARAM_VALUE_LEN_MAX + 2;
- current->dataIndex = param_alloc(allocSize);
- ParamNode* saveParam = GetParamEntry(current->dataIndex);
- BEGET_ERROR_CHECK((current->dataIndex != 0) && (saveParam != NULL), pthread_rwlock_unlock(&rwlock); return -1, "can not alloc param or get param entry");
- sprintf(saveParam->data, "%s=%s", key, value);
- saveParam->keyLen = strlen(key);
- saveParam->valueLen = strlen(value);
-
- current->node.prev = root->node.prev;
- current->node.next = (void*)(&root->node) - paramWorkSpace->shareAddr;
- ListNode* rootPrevListNode = GetListNodeEntry(root->node.prev);
- TrieNode* rootPrevTrieNode = ListNodeGetTrieEntry(rootPrevListNode);
- BEGET_ERROR_CHECK((rootPrevListNode != NULL) && (rootPrevTrieNode != NULL), pthread_rwlock_unlock(&rwlock); return -1, "can not get list entry or get trie entry");
- rootPrevTrieNode->node.next = (void*)(&current->node) - paramWorkSpace->shareAddr;
- root->node.prev = (void*)(&current->node) - paramWorkSpace->shareAddr;
- break;
- }
- remainKey = subKey + 1;
- }
- atomic_store(&cnt, 1);
- atomic_store(&waitCnt, 1);
- pthread_rwlock_unlock(&rwlock);
- return 0;
-}
-
-int GetParamFromMem(const char* key, char* value, uint32_t len)
-{
- BEGET_ERROR_CHECK((paramWorkSpace != NULL) && (paramWorkSpace->shareAddr != NULL), return -1, "invalid paramWorkSpace");
- BEGET_ERROR_CHECK((key != NULL ) && (value != NULL), return -1, "invalid key or value");
-
- TrieNode* current = GetRootNode();
- if (current == NULL)
- return -1;
-
- ParamNode* paramData;
- char* remainKey = (char *)key;
- pthread_rwlock_rdlock(&rwlock);
- while (1) {
- char* subKey;
- uint32_t prefixLen;
- GetSubKey(remainKey, &subKey, &prefixLen);
- current = FindSubTrieNode(GetTrieEntry(current->child), remainKey, prefixLen);
- BEGET_ERROR_CHECK(current != NULL, pthread_rwlock_unlock(&rwlock); return -1, "can not find sub trie node : %s", key);
- if (subKey == NULL) {
- paramData = GetParamEntry(current->dataIndex);
- BEGET_ERROR_CHECK(paramData != NULL, pthread_rwlock_unlock(&rwlock); return -1, "can not get param entry");
- break;
- }
- remainKey = subKey + 1;
- }
-
- if (len > paramData->valueLen) {
- (void)memcpy_s(value, PARAM_VALUE_LEN_MAX, paramData->data + paramData->keyLen + 1, paramData->valueLen);
- value[paramData->valueLen] = '\0';
- } else {
- (void)memcpy_s(value, len, paramData->data + paramData->keyLen + 1, len);
- value[len] = '\0';
- }
- pthread_rwlock_unlock(&rwlock);
- return 0;
-}
-
-int WaitParam(const char* key, const char* value, uint32_t timeout)
-{
- BEGET_ERROR_CHECK((paramWorkSpace != NULL) && (paramWorkSpace->shareAddr != NULL), return -1, "invalid paramWorkSpace");
- BEGET_ERROR_CHECK((key != NULL ) && (value != NULL), return -1, "invalid key or value");
- int ret;
- char tmp[PARAM_VALUE_LEN_MAX] = {0};
- ret = GetParamFromMem(key, tmp, sizeof(tmp));
- if (ret == 0) {
- if (strncmp(value, "*", strlen(value)) == 0) {
- return 0;
- }
- if (strlen(value) == strlen(tmp) && strncmp(value, tmp, strlen(value)) == 0) {
- return 0;
- }
- bzero(tmp, sizeof(tmp));
- }
- while (timeout != 0) {
- if (atomic_load(&waitCnt)) {
- atomic_store(&waitCnt, 0);
- ret = GetParamFromMem(key, tmp, sizeof(tmp));
- if (ret == 0) {
- if (strlen(tmp) == 1 && strncmp(value, "*", 1) == 0) {
- return 0;
- }
- if (strlen(value) == strlen(tmp) && strncmp(value, tmp, strlen(value)) == 0) {
- return 0;
- }
- bzero(tmp, sizeof(tmp));
- }
- }
- --timeout;
- sleep(1);
- }
- return -1;
-}
-
-void WritetoDisk(TrieNode* node, FILE* fp)
+static void WritetoDisk(TrieNode* node, FILE* fp)
{
BEGET_ERROR_CHECK(node != NULL, return, "invalid node");
BEGET_ERROR_CHECK(fp != NULL, return, "invalid file descriptor ");
@@ -344,57 +209,125 @@ void WritetoDisk(TrieNode* node, FILE* fp)
fputs(buf, fp);
}
-void DumpParam()
+static void FullWrite()
+{
+ // clean up trieQueue
+ pthread_mutex_lock(&queuelock);
+ while (trieQueue.size > 0) {
+ trieQueue.pop(&trieQueue);
+ }
+ pthread_mutex_unlock(&queuelock);
+ unlink(USER_PARAM_FILE);
+ FILE* fp = fopen(USER_PARAM_FILE, "w+");
+ TrieNode* root = GetRootNode();
+ TrieListNode* current = GetTrieListNodeEntry(root->node.next);
+ BEGET_ERROR_CHECK((root != NULL) && (current != NULL), fclose(fp); return, "can not get root node or get list entry");
+ while (current != &root->node) {
+ TrieNode* trienode = TrieListNodeGetTrieEntry(current);
+ WritetoDisk(trienode, fp);
+ current = GetTrieListNodeEntry(current->next);
+ }
+ fclose(fp);
+}
+
+static void AppendWrite()
{
- if (atomic_load(&cnt)) {
- pthread_mutex_lock(&mtlock);
- atomic_store(&cnt, 0);
- unlink(USER_PARAM_FILE);
- FILE* fp = fopen(USER_PARAM_FILE, "a+");
- TrieNode* root = GetRootNode();
- ListNode* current = GetListNodeEntry(root->node.next);
- BEGET_ERROR_CHECK((root != NULL) && (current != NULL), pthread_mutex_unlock(&mtlock); fclose(fp); return, "can not get root node or get list entry");
- while (current != &root->node) {
- TrieNode* trienode = ListNodeGetTrieEntry(current);
+ FILE* fp = fopen(USER_PARAM_FILE, "a+");
+ pthread_mutex_lock(&queuelock);
+ while (trieQueue.size > 0) {
+ TrieNode* trienode = trieQueue.pop(&trieQueue);
+ if (trienode != NULL) {
WritetoDisk(trienode, fp);
- current = GetListNodeEntry(current->next);
}
- fclose(fp);
- pthread_mutex_unlock(&mtlock);
}
+ pthread_mutex_unlock(&queuelock);
+ fclose(fp);
}
-void ProcessParamFile(char* fileName)
+static void DumpParam()
+{
+ FullWrite(); // Init阶段相同key值的字段仅有一份
+ while (1) {
+ sem_wait(&dump_sem);
+ sem_init(&dump_sem, 0, 0);
+ /*
+ persist属性的字段为持久化保存当含有persist字段的key更新或新增次数未达到累积次数时此处临界值为50从队列中获取更改或新增的字段对file进行追加型写入保存方式如下
+ 第一次设置时file 内容:
+ persist.openeuler.version=22.03
+ 第二次设置时file 内容:
+ persist.openeuler.version=22.03
+ persist.openeuler.version=23.03
+ 当达到累积次数时不再从队列中获取更改或新增的字段遍历整个workspace重新对file写入, 此时相同key值的字段仅有一份保存方式如下
+ 第一次设置时file 内容:
+ persist.openeuler.version=22.03
+ 第二次设置时file 内容:
+ persist.openeuler.version=23.03
+ */
+ if (atomic_load(&updateCnt) >= 50) {
+ atomic_store(&updateCnt, 0);
+ FullWrite();
+ } else {
+ AppendWrite();
+ }
+ }
+}
+
+static void ProcessParamFile(char* fileName)
{
BEGET_ERROR_CHECK(access(fileName, F_OK) == 0, perror("error"); return, "failed to access %s", fileName);
FILE* fp = fopen(fileName, "r");
BEGET_ERROR_CHECK(fp != NULL, return, "failed to open %s", fileName);
- char buf[PARAM_NAME_LEN_MAX + PARAM_VALUE_LEN_MAX];
- bzero(buf, sizeof(buf));
- while (fgets(buf, sizeof(buf), fp) != NULL) {
- buf[strlen(buf) - 1] = '\0';
- if (*buf == '#')
+
+ char *line = (char*)calloc(1, PARAM_NAME_LEN_MAX + PARAM_VALUE_LEN_MAX);
+ while (fgets(line, PARAM_NAME_LEN_MAX + PARAM_VALUE_LEN_MAX, fp) != NULL) {
+ line[strlen(line) - 1] = '\0';
+ if (*line == '#')
continue;
- char *sep = buf;
- char *key = NULL;
+
+ // Skip the line beginning spaces
+ while (isspace(*line) && (*line != '\0')) {
+ line++;
+ }
+
+ // Skip the spaces at the end of line
+ int len = strlen(line);
+ while (len > 0 && isspace(line[len - 1])) {
+ line[len - 1] = '\0';
+ len--;
+ }
+
+ if (*line == '\0')
+ continue;
+
+ char *sep = line;
+ char *key = sep;
char *value = NULL;
while (*sep != '\0') {
+ if (isspace(*sep)) {
+ *sep = '\0';
+ }
if (*sep == '=') {
*sep = '\0';
value = sep + 1;
- key = buf;
break;
}
- ++sep;
+ sep++;
+ }
+
+ if (value) {
+ // Skip the value beginning spaces
+ while (isspace(*value) && (*value != '\0')) {
+ value++;
+ }
+ if (*value == '\0')
+ continue;
+ SetParamtoMem(key, value);
}
- if (key) {
- SetParamtoMem(key, value);
- }
}
fclose(fp);
}
-void ReadFileInDir(char* dir, char* postfix)
+static void ReadFileInDir(char* dir, char* postfix)
{
BEGET_ERROR_CHECK((dir != NULL) && (postfix != NULL), return, "invalid directory");
DIR* pDir = opendir(dir);
@@ -417,7 +350,7 @@ void ReadFileInDir(char* dir, char* postfix)
closedir(pDir);
}
-void LoadParam(char* dir)
+static void LoadParam(char* dir)
{
BEGET_ERROR_CHECK(dir != NULL, return, "invalid directory");
struct stat st;
@@ -428,35 +361,16 @@ void LoadParam(char* dir)
}
// 定时持久化数据
-void CreateParamListener()
+static int CreateParamListener()
{
- atomic_init(&cnt, 0);
- struct sigevent sigev;
- bzero(&sigev, sizeof(struct sigevent));
- sigev.sigev_notify = SIGEV_THREAD;
- sigev.sigev_notify_function = DumpParam;
- sigev.sigev_notify_attributes = NULL;
-
- timer_t timerId;
- if (timer_create(CLOCK_REALTIME, &sigev, &timerId) != 0) {
- perror("timer_create:");
- exit(EXIT_FAILURE);
- }
-
- struct itimerspec value;
- bzero(&value, sizeof(struct itimerspec));
- value.it_value.tv_sec = 1;
- value.it_value.tv_nsec = 0;
- value.it_interval.tv_sec = 1;
- value.it_interval.tv_nsec = 0;
-
- if (timer_settime(timerId, 0, &value, NULL) != 0) {
- perror("timer_settime:");
- exit(EXIT_FAILURE);
- }
+ pthread_t dp;
+ int ret = pthread_create(&dp, NULL, (void*)DumpParam, NULL);
+ BEGET_ERROR_CHECK(ret == 0, return ret, "failed to create param listener");
+ pthread_detach(dp);
+ return 0;
}
-void InitRootNode()
+static void InitRootNode()
{
BEGET_ERROR_CHECK(paramWorkSpace != NULL, return, "invalid paramWorkSpace");
TrieNode* rootNode = paramWorkSpace->shareAddr + trie_alloc("#");
@@ -469,16 +383,16 @@ int ParamWorkSpaceInit()
MakeDirRecursive(SYSTEM_PARAM_PATH, S_IRWXU | S_IRWXU | S_IRWXU | S_IROTH | S_IXOTH);
MakeDirRecursive(USER_PARAM_PATH, S_IRWXU | S_IRWXU | S_IRWXU | S_IROTH | S_IXOTH);
MakeDirRecursive(WORKSPACE_DIR, S_IRWXU | S_IRWXU | S_IRWXU | S_IROTH | S_IXOTH);
+
+ sem_init(&dump_sem, 0, 0);
pthread_rwlock_init(&rwlock, NULL);
- pthread_mutex_init(&mtlock, NULL);
- atomic_init(&waitCnt, 0);
+ pthread_mutex_init(&queuelock, NULL);
paramWorkSpace = (TrieHeader*)malloc(sizeof(TrieHeader));
BEGET_ERROR_CHECK(paramWorkSpace != NULL, return -1, "failed to malloc for param workspace");
int fd = open(WORKSPACE_NAME, O_CREAT | O_RDWR | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
BEGET_ERROR_CHECK(fd > 0, return -1, "failed to open %s", WORKSPACE_NAME);
- int ret = ftruncate(fd, WORKSPACE_SIZE);
- (void)ret;
+ ftruncate(fd, WORKSPACE_SIZE);
paramWorkSpace->shareAddr = mmap(NULL, WORKSPACE_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
BEGET_ERROR_CHECK(paramWorkSpace->shareAddr != MAP_FAILED, return -1, "failed to create mmap");
paramWorkSpace->rootOffest = 0;
@@ -486,8 +400,142 @@ int ParamWorkSpaceInit()
paramWorkSpace->trieSize = 0;
paramWorkSpace->paramSize = 0;
InitRootNode();
+
+ // before LoadParam
+ TrieQueueFirstStageInit(&trieQueue);
LoadParam(SYSTEM_PARAM_PATH);
LoadParam(USER_PARAM_PATH);
- CreateParamListener();
+ // behind LoadParam
+ TrieQueueSecondStageInit(&trieQueue);
+
+ return CreateParamListener();
+}
+
+int SetParamtoMem(const char* key, const char* value)
+{
+ BEGET_ERROR_CHECK((paramWorkSpace != NULL) && (paramWorkSpace->shareAddr != NULL), return -1, "invalid paramWorkSpace");
+ BEGET_ERROR_CHECK((key != NULL ) && (value != NULL), return -1, "invalid key or value");
+ BEGET_ERROR_CHECK((strlen(key) > 0) && (strlen(key) <= PARAM_NAME_LEN_MAX), return -1, "invalid key len");
+ BEGET_ERROR_CHECK((strlen(value) > 0) && (strlen(value) <= PARAM_VALUE_LEN_MAX), return -1, "invalid value len");
+ BEGET_ERROR_CHECK(CheckParamName(key) == 0, return -1, "invalid parameter name");
+
+ TrieNode* root = GetRootNode();
+ TrieNode* current = GetRootNode();
+ if (root == NULL || current == NULL)
+ return -1;
+
+ char* remainKey = key;
+ pthread_rwlock_wrlock(&rwlock);
+ while(1) {
+ char* subKey;
+ uint32_t prefixLen;
+ GetSubKey(remainKey, &subKey, &prefixLen);
+ if (current->child != 0) {
+ current = AddSubTrieNode(GetTrieEntry(current->child), remainKey, prefixLen);
+ BEGET_ERROR_CHECK(current != NULL, pthread_rwlock_unlock(&rwlock); return -1, "can not AddSubTrieNode");
+ } else {
+ char prefixKey[PARAM_NAME_LEN_MAX] = {0};
+ (void)memcpy_s(prefixKey, PARAM_NAME_LEN_MAX, remainKey, prefixLen);
+ current->child = trie_alloc(prefixKey);
+ BEGET_ERROR_CHECK(current->child != 0, pthread_rwlock_unlock(&rwlock); return -1, "can not alloc tire node");
+ current = GetTrieEntry(current->child);
+ BEGET_ERROR_CHECK(current != NULL, pthread_rwlock_unlock(&rwlock); return -1, "can not get trie entry");
+ }
+ if (subKey == NULL) {
+ if (current->dataIndex) { // Param update
+ int ret = strncmp(key, CONST_PREFIX, strlen(CONST_PREFIX)) ;
+ BEGET_ERROR_CHECK(ret != 0, pthread_rwlock_unlock(&rwlock); return -1, "can not change the value of a constant parameter");
+ ParamNode* saveParam = GetParamEntry(current->dataIndex);
+ BEGET_ERROR_CHECK(saveParam != NULL, pthread_rwlock_unlock(&rwlock); return -1, "can not get param entry");
+ (void)memcpy_s(saveParam->data + saveParam->keyLen + 1, PARAM_VALUE_LEN_MAX, value, strlen(value));
+ saveParam->valueLen = strlen(value);
+ atomic_fetch_add_explicit(&updateCnt, 1, memory_order_relaxed);
+ break;
+ } else { // Param add
+ uint32_t allocSize = strlen(key) + PARAM_VALUE_LEN_MAX + 2;
+ current->dataIndex = param_alloc(allocSize);
+ ParamNode* saveParam = GetParamEntry(current->dataIndex);
+ BEGET_ERROR_CHECK((current->dataIndex != 0) && (saveParam != NULL), pthread_rwlock_unlock(&rwlock); return -1, "can not alloc param or get param entry");
+ sprintf(saveParam->data, "%s=%s", key, value);
+ saveParam->keyLen = strlen(key);
+ saveParam->valueLen = strlen(value);
+
+ current->node.prev = root->node.prev;
+ current->node.next = (void*)(&root->node) - paramWorkSpace->shareAddr;
+ TrieListNode* rootPrevTrieListNode = GetTrieListNodeEntry(root->node.prev);
+ TrieNode* rootPrevTrieNode = TrieListNodeGetTrieEntry(rootPrevTrieListNode);
+ BEGET_ERROR_CHECK((rootPrevTrieListNode != NULL) && (rootPrevTrieNode != NULL), pthread_rwlock_unlock(&rwlock); return -1, "can not get list entry or get trie entry");
+ rootPrevTrieNode->node.next = (void*)(&current->node) - paramWorkSpace->shareAddr;
+ root->node.prev = (void*)(&current->node) - paramWorkSpace->shareAddr;
+ break;
+ }
+ }
+ remainKey = subKey + 1;
+ }
+
+ if (strncmp(key, PARAM_PERSIST_PREFIX, strlen(PARAM_PERSIST_PREFIX)) == 0) {
+ pthread_mutex_lock(&queuelock);
+ trieQueue.push(&trieQueue, current);
+ pthread_mutex_unlock(&queuelock);
+ sem_post(&dump_sem);
+ }
+ pthread_rwlock_unlock(&rwlock);
return 0;
}
+
+int GetParamFromMem(const char* key, char* value, uint32_t len)
+{
+ BEGET_ERROR_CHECK((paramWorkSpace != NULL) && (paramWorkSpace->shareAddr != NULL), return -1, "invalid paramWorkSpace");
+ BEGET_ERROR_CHECK((key != NULL ) && (value != NULL), return -1, "invalid key or value");
+
+ TrieNode* current = GetRootNode();
+ if (current == NULL)
+ return -1;
+
+ ParamNode* paramData;
+ char* remainKey = key;
+ pthread_rwlock_rdlock(&rwlock);
+ while (1) {
+ char* subKey;
+ uint32_t prefixLen;
+ GetSubKey(remainKey, &subKey, &prefixLen);
+ current = FindSubTrieNode(GetTrieEntry(current->child), remainKey, prefixLen);
+ BEGET_ERROR_CHECK(current != NULL, pthread_rwlock_unlock(&rwlock); return -1, "can not find sub trie node : %s", key);
+ if (subKey == NULL) {
+ paramData = GetParamEntry(current->dataIndex);
+ BEGET_ERROR_CHECK(paramData != NULL, pthread_rwlock_unlock(&rwlock); return -1, "can not get param entry");
+ break;
+ }
+ remainKey = subKey + 1;
+ }
+
+ if (len > paramData->valueLen) {
+ (void)memcpy_s(value, PARAM_VALUE_LEN_MAX, paramData->data + paramData->keyLen + 1, paramData->valueLen);
+ value[paramData->valueLen] = '\0';
+ } else {
+ (void)memcpy_s(value, len, paramData->data + paramData->keyLen + 1, len);
+ value[len] = '\0';
+ }
+ pthread_rwlock_unlock(&rwlock);
+ return 0;
+}
+
+int WaitParam(const char* key, const char* value, uint32_t timeout)
+{
+ BEGET_ERROR_CHECK((paramWorkSpace != NULL) && (paramWorkSpace->shareAddr != NULL), return -1, "invalid paramWorkSpace");
+ BEGET_ERROR_CHECK((key != NULL ) && (value != NULL), return -1, "invalid key or value");
+ int ret;
+ char tmp[PARAM_VALUE_LEN_MAX] = {0};
+ ret = GetParamFromMem(key, tmp, sizeof(tmp));
+ if (ret == 0) {
+ if (strncmp(value, "*", strlen(value)) == 0) {
+ return 0;
+ }
+ if (strlen(value) == strlen(tmp) && strncmp(value, tmp, strlen(value)) == 0) {
+ return 0;
+ }
+ bzero(tmp, sizeof(tmp));
+ }
+
+ return -1;
+}
diff --git a/services/utils/BUILD.gn b/services/utils/BUILD.gn
index e5f6a96..30529be 100755
--- a/services/utils/BUILD.gn
+++ b/services/utils/BUILD.gn
@@ -10,24 +10,27 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-import("//build/config/sysroot.gni")
config("exported_header_files") {
visibility = [ ":*" ]
include_dirs = [
"//base/startup/init/services/include",
- "${sysroot}/usr/include/hilog",
+ "//base/hiviewdfx/hilog/interfaces/native/innerkits/include",
]
}
import("//build/ohos.gni")
ohos_static_library("libinit_utils") {
- sources = [ "init_utils.c" ]
+ sources = [
+ "init_utils.c",
+ "list.c"
+ ]
public_configs = [ ":exported_header_files" ]
include_dirs = [
"//base/startup/init/interfaces/innerkits/include",
"//third_party/bounds_checking_function/include",
- "${sysroot}/usr/include/hilog",
+ "//base/hiviewdfx/hilog/interfaces/native/innerkits/include",
+ "//base/startup/init/services/include"
]
deps = [
"//base/hiviewdfx/hilog/interfaces/native/innerkits:libhilog",
diff --git a/services/utils/init_utils.c b/services/utils/init_utils.c
index 8b4b2e0..733f863 100644
--- a/services/utils/init_utils.c
+++ b/services/utils/init_utils.c
@@ -49,7 +49,7 @@ float ConvertMicrosecondToSecond(int x)
}
#ifndef __LITEOS_M__
-__attribute__((unused)) static bool CheckDigit(const char *name)
+static bool CheckDigit(const char *name)
{
size_t nameLen = strlen(name);
for (size_t i = 0; i < nameLen; ++i) {
diff --git b/services/utils/list.c b/services/utils/list.c
new file mode 100644
index 0000000..2ef1ad5
--- /dev/null
+++ b/services/utils/list.c
@@ -0,0 +1,39 @@
+/*
+ * Copyright (c) 2021 Huawei Device Co., Ltd.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "list.h"
+
+#include <stddef.h>
+#include <stdlib.h>
+
+void OH_ListAddTail(struct ListNode *head, struct ListNode *item)
+{
+ if (head == NULL || item == NULL) {
+ return;
+ }
+ item->next = head;
+ item->prev = head->prev;
+ head->prev->next = item;
+ head->prev = item;
+}
+
+void OH_ListRemove(struct ListNode *item)
+{
+ if (item == NULL) {
+ return;
+ }
+ item->next->prev = item->prev;
+ item->prev->next = item->next;
+}
\ No newline at end of file