eggo/0008-refactor-node-mananger.patch
zhangxiaoyu 0777151d5d upgrage to v0.9.1-1
Signed-off-by: zhangxiaoyu <zhangxiaoyu58@huawei.com>
2021-08-03 15:28:17 +08:00

419 lines
15 KiB
Diff

From 52f415c3888b9bd0484ba5f66b2e987e41d4f9fb Mon Sep 17 00:00:00 2001
From: haozi007 <liuhao27@huawei.com>
Date: Mon, 2 Aug 2021 07:24:24 +0100
Subject: [PATCH 8/9] refactor node mananger
1. add timeout of run task;
2. show tasks history on node;
Signed-off-by: haozi007 <liuhao27@huawei.com>
---
.../binary/addons/runner_addons.go | 3 +-
pkg/clusterdeployment/binary/binary.go | 7 +-
.../binary/cleanupcluster/cleanupetcd.go | 6 +-
.../cleanupcluster/cleanuploadbalance.go | 3 +-
.../binary/cleanupcluster/cleanupnode.go | 6 +-
.../binary/coredns/binarycoredns.go | 3 +-
.../binary/coredns/podcoredns.go | 3 +-
.../binary/etcdcluster/etcdreconfig.go | 3 +-
.../binary/infrastructure/infrastructure.go | 3 +-
.../binary/network/network.go | 3 +-
pkg/utils/nodemanager/node.go | 90 ++++++++++++++-----
pkg/utils/task/task.go | 10 +++
12 files changed, 93 insertions(+), 47 deletions(-)
diff --git a/pkg/clusterdeployment/binary/addons/runner_addons.go b/pkg/clusterdeployment/binary/addons/runner_addons.go
index f8c0dad..e5bc0ad 100644
--- a/pkg/clusterdeployment/binary/addons/runner_addons.go
+++ b/pkg/clusterdeployment/binary/addons/runner_addons.go
@@ -122,7 +122,7 @@ func cleanupAddons(cluster *api.ClusterConfig) error {
yamlPath := filepath.Join(cluster.PackageSrc.GetPkgDstPath(), constants.DefaultFilePath)
kubeconfig := filepath.Join(cluster.GetConfigDir(), constants.KubeConfigFileNameAdmin)
- t := task.NewTaskInstance(&CleanupAddonsTask{
+ t := task.NewTaskIgnoreErrInstance(&CleanupAddonsTask{
yaml: yaml,
srcPath: yamlPath,
kubeconfig: kubeconfig,
@@ -134,7 +134,6 @@ func cleanupAddons(cluster *api.ClusterConfig) error {
}
}
- task.SetIgnoreErrorFlag(t)
useMaster, err := nodemanager.RunTaskOnOneNode(t, masters)
if err != nil {
return err
diff --git a/pkg/clusterdeployment/binary/binary.go b/pkg/clusterdeployment/binary/binary.go
index dd260b0..f901643 100644
--- a/pkg/clusterdeployment/binary/binary.go
+++ b/pkg/clusterdeployment/binary/binary.go
@@ -102,12 +102,11 @@ func (bcp *BinaryClusterDeployment) registerNodes() error {
defer func() {
if err != nil {
bcp.Finish()
- nodemanager.UnRegisterAllNodes()
}
}()
for _, cfg := range bcp.config.Nodes {
- err := bcp.registerNode(cfg)
+ err = bcp.registerNode(cfg)
if err != nil {
return err
}
@@ -408,9 +407,7 @@ func (bcp *BinaryClusterDeployment) Finish() {
logrus.Info("do finish binary deployment...")
bcp.connLock.Lock()
defer bcp.connLock.Unlock()
- for _, c := range bcp.connections {
- c.Close()
- }
+ nodemanager.UnRegisterAllNodes()
bcp.connections = make(map[string]runner.Runner)
logrus.Info("do finish binary deployment success")
}
diff --git a/pkg/clusterdeployment/binary/cleanupcluster/cleanupetcd.go b/pkg/clusterdeployment/binary/cleanupcluster/cleanupetcd.go
index 81e1fe6..ed79246 100644
--- a/pkg/clusterdeployment/binary/cleanupcluster/cleanupetcd.go
+++ b/pkg/clusterdeployment/binary/cleanupcluster/cleanupetcd.go
@@ -84,13 +84,12 @@ func CleanupEtcdMember(conf *api.ClusterConfig, hostconfig *api.HostConfig) erro
}
// cleanup remains
- taskCleanupEtcdMember := task.NewTaskInstance(
+ taskCleanupEtcdMember := task.NewTaskIgnoreErrInstance(
&cleanupEtcdMemberTask{
ccfg: conf,
},
)
- task.SetIgnoreErrorFlag(taskCleanupEtcdMember)
if err := nodemanager.RunTaskOnNodes(taskCleanupEtcdMember, []string{hostconfig.Address}); err != nil {
return fmt.Errorf("run task for cleanup etcd member failed: %v", err)
}
@@ -114,14 +113,13 @@ func CleanupAllEtcds(conf *api.ClusterConfig) error {
}
// cleanup remains
- taskCleanupAllEtcds := task.NewTaskInstance(
+ taskCleanupAllEtcds := task.NewTaskIgnoreErrInstance(
&cleanupEtcdMemberTask{
ccfg: conf,
},
)
nodes := utils.GetAllIPs(conf.EtcdCluster.Nodes)
- task.SetIgnoreErrorFlag(taskCleanupAllEtcds)
if err := nodemanager.RunTaskOnNodes(taskCleanupAllEtcds, nodes); err != nil {
return fmt.Errorf("run task for cleanup all etcds failed: %v", err)
}
diff --git a/pkg/clusterdeployment/binary/cleanupcluster/cleanuploadbalance.go b/pkg/clusterdeployment/binary/cleanupcluster/cleanuploadbalance.go
index 010ef84..d836f99 100644
--- a/pkg/clusterdeployment/binary/cleanupcluster/cleanuploadbalance.go
+++ b/pkg/clusterdeployment/binary/cleanupcluster/cleanuploadbalance.go
@@ -56,13 +56,12 @@ func (t *cleanupLoadBalanceTask) Run(r runner.Runner, hostConfig *api.HostConfig
}
func CleanupLoadBalance(conf *api.ClusterConfig, lb *api.HostConfig) error {
- taskCleanupLoadBalance := task.NewTaskInstance(
+ taskCleanupLoadBalance := task.NewTaskIgnoreErrInstance(
&cleanupLoadBalanceTask{
ccfg: conf,
},
)
- task.SetIgnoreErrorFlag(taskCleanupLoadBalance)
if err := nodemanager.RunTaskOnNodes(taskCleanupLoadBalance, []string{lb.Address}); err != nil {
return fmt.Errorf("run task for cleanup loadbalance failed: %v", err)
}
diff --git a/pkg/clusterdeployment/binary/cleanupcluster/cleanupnode.go b/pkg/clusterdeployment/binary/cleanupcluster/cleanupnode.go
index da0488a..9d2d009 100644
--- a/pkg/clusterdeployment/binary/cleanupcluster/cleanupnode.go
+++ b/pkg/clusterdeployment/binary/cleanupcluster/cleanupnode.go
@@ -218,7 +218,7 @@ func (t *removeWorkerTask) Run(r runner.Runner, hostConfig *api.HostConfig) erro
}
func execRemoveWorkerTask(conf *api.ClusterConfig, hostconfig *api.HostConfig) error {
- taskRemoveWorker := task.NewTaskInstance(
+ taskRemoveWorker := task.NewTaskIgnoreErrInstance(
&removeWorkerTask{
ccfg: conf,
workerName: hostconfig.Name,
@@ -230,7 +230,6 @@ func execRemoveWorkerTask(conf *api.ClusterConfig, hostconfig *api.HostConfig) e
return fmt.Errorf("failed to get first master")
}
- task.SetIgnoreErrorFlag(taskRemoveWorker)
if err := nodemanager.RunTaskOnNodes(taskRemoveWorker, []string{master}); err != nil {
return err
}
@@ -249,14 +248,13 @@ func CleanupNode(conf *api.ClusterConfig, hostconfig *api.HostConfig, delType ui
}
}
- taskCleanupNode := task.NewTaskInstance(
+ taskCleanupNode := task.NewTaskIgnoreErrInstance(
&cleanupNodeTask{
ccfg: conf,
delType: delType,
},
)
- task.SetIgnoreErrorFlag(taskCleanupNode)
if err := nodemanager.RunTaskOnNodes(taskCleanupNode, []string{hostconfig.Address}); err != nil {
return fmt.Errorf("run task for cleanup cluster failed: %v", err)
}
diff --git a/pkg/clusterdeployment/binary/coredns/binarycoredns.go b/pkg/clusterdeployment/binary/coredns/binarycoredns.go
index db9bcd5..411dba2 100644
--- a/pkg/clusterdeployment/binary/coredns/binarycoredns.go
+++ b/pkg/clusterdeployment/binary/coredns/binarycoredns.go
@@ -362,14 +362,13 @@ func (bc *BinaryCoredns) Cleanup(cluster *api.ClusterConfig) error {
logrus.Warn("no master host found, can not cleanup coredns service")
return nil
}
- sst := task.NewTaskInstance(
+ sst := task.NewTaskIgnoreErrInstance(
&BinaryCorednsCleanupTask{
Cluster: cluster,
cleanYaml: true,
},
)
- task.SetIgnoreErrorFlag(sst)
err := nodemanager.RunTaskOnNodes(sst, masterIPs)
if err != nil {
logrus.Warnf("run cleanup coredns task failed: %v", err)
diff --git a/pkg/clusterdeployment/binary/coredns/podcoredns.go b/pkg/clusterdeployment/binary/coredns/podcoredns.go
index c27f1d1..28ae908 100644
--- a/pkg/clusterdeployment/binary/coredns/podcoredns.go
+++ b/pkg/clusterdeployment/binary/coredns/podcoredns.go
@@ -150,7 +150,7 @@ func (pc *PodCoredns) Cleanup(cluster *api.ClusterConfig) error {
if cluster == nil {
return fmt.Errorf("invalid cluster config")
}
- t := task.NewTaskInstance(&PodCorednsCleanupTask{Cluster: cluster})
+ t := task.NewTaskIgnoreErrInstance(&PodCorednsCleanupTask{Cluster: cluster})
var masters []string
for _, n := range cluster.Nodes {
if (n.Type & api.Master) != 0 {
@@ -158,7 +158,6 @@ func (pc *PodCoredns) Cleanup(cluster *api.ClusterConfig) error {
}
}
- task.SetIgnoreErrorFlag(t)
useMaster, err := nodemanager.RunTaskOnOneNode(t, masters)
if err != nil {
return err
diff --git a/pkg/clusterdeployment/binary/etcdcluster/etcdreconfig.go b/pkg/clusterdeployment/binary/etcdcluster/etcdreconfig.go
index b417861..aadc4a8 100644
--- a/pkg/clusterdeployment/binary/etcdcluster/etcdreconfig.go
+++ b/pkg/clusterdeployment/binary/etcdcluster/etcdreconfig.go
@@ -196,13 +196,12 @@ func (t *removeEtcdsTask) Run(r runner.Runner, hostConfig *api.HostConfig) error
}
func execRemoveEtcdsTask(conf *api.ClusterConfig, node string) error {
- taskRemoveEtcds := task.NewTaskInstance(
+ taskRemoveEtcds := task.NewTaskIgnoreErrInstance(
&removeEtcdsTask{
ccfg: conf,
},
)
- task.SetIgnoreErrorFlag(taskRemoveEtcds)
if err := nodemanager.RunTaskOnNodes(taskRemoveEtcds, []string{node}); err != nil {
logrus.Errorf("run task for remove etcds failed: %v", err)
return err
diff --git a/pkg/clusterdeployment/binary/infrastructure/infrastructure.go b/pkg/clusterdeployment/binary/infrastructure/infrastructure.go
index bf6ed3a..407c4a8 100644
--- a/pkg/clusterdeployment/binary/infrastructure/infrastructure.go
+++ b/pkg/clusterdeployment/binary/infrastructure/infrastructure.go
@@ -419,13 +419,12 @@ func NodeInfrastructureDestroy(config *api.ClusterConfig, hostconfig *api.HostCo
return fmt.Errorf("do not register %d roleinfra", hostconfig.Type)
}
- itask := task.NewTaskInstance(
+ itask := task.NewTaskIgnoreErrInstance(
&DestroyInfraTask{
packageSrc: &config.PackageSrc,
roleInfra: roleInfra,
})
- task.SetIgnoreErrorFlag(itask)
if err := nodemanager.RunTaskOnNodes(itask, []string{hostconfig.Address}); err != nil {
return fmt.Errorf("destroy infrastructure Task failed: %v", err)
}
diff --git a/pkg/clusterdeployment/binary/network/network.go b/pkg/clusterdeployment/binary/network/network.go
index e1cd73f..0c35a9c 100644
--- a/pkg/clusterdeployment/binary/network/network.go
+++ b/pkg/clusterdeployment/binary/network/network.go
@@ -121,7 +121,7 @@ func CleanupNetwork(cluster *api.ClusterConfig) error {
if cluster == nil {
return fmt.Errorf("invalid cluster config")
}
- t := task.NewTaskInstance(&CleanupNetworkTask{Cluster: cluster})
+ t := task.NewTaskIgnoreErrInstance(&CleanupNetworkTask{Cluster: cluster})
var masters []string
for _, n := range cluster.Nodes {
if (n.Type & api.Master) != 0 {
@@ -129,7 +129,6 @@ func CleanupNetwork(cluster *api.ClusterConfig) error {
}
}
- task.SetIgnoreErrorFlag(t)
useMaster, err := nodemanager.RunTaskOnOneNode(t, masters)
if err != nil {
return err
diff --git a/pkg/utils/nodemanager/node.go b/pkg/utils/nodemanager/node.go
index a19d2e7..08af098 100644
--- a/pkg/utils/nodemanager/node.go
+++ b/pkg/utils/nodemanager/node.go
@@ -17,6 +17,7 @@ package nodemanager
import (
"fmt"
+ "strings"
"sync"
"time"
@@ -54,6 +55,12 @@ func (ns NodeStatus) ShowCounts() string {
return fmt.Sprintf("{ total: %d, success: %d, fail: %d, ignore: %d }", ns.TaskTotalCnt, ns.TaskSuccessCnt, ns.TaskFailCnt, ns.TaskIgnoreCnt)
}
+type taskSummary struct {
+ name string
+ useTime time.Duration
+ status string
+}
+
type Node struct {
host *api.HostConfig
r runner.Runner
@@ -62,6 +69,32 @@ type Node struct {
queue chan task.Task
lock sync.RWMutex
status NodeStatus
+
+ tasksHistory []taskSummary
+}
+
+func (n *Node) addHistory(t task.Task, err error, useTime time.Duration) {
+ ts := taskSummary{name: t.Name(), useTime: useTime}
+ if err == nil {
+ ts.status = "success"
+ } else {
+ if task.IsIgnoreError(t) {
+ ts.status = fmt.Sprintf("ignore err: %v", err)
+ } else {
+ ts.status = err.Error()
+ }
+ }
+ n.tasksHistory = append(n.tasksHistory, ts)
+}
+
+func (n *Node) ShowTaskList() string {
+ var sb strings.Builder
+ sb.WriteString(fmt.Sprintf("\n##################tasks on node: %s#################\n", n.host.Name))
+ for _, n := range n.tasksHistory {
+ sb.WriteString(fmt.Sprintf("name: %s, elapsed time: %s, message: %s\n", n.name, n.useTime.String(), n.status))
+ }
+ sb.WriteString("#########################################\n")
+ return sb.String()
}
func (n *Node) GetStatus() NodeStatus {
@@ -135,7 +168,42 @@ func (n *Node) PushTask(t task.Task) bool {
func (n *Node) Finish() {
n.stop <- true
n.r.Close()
- logrus.Infof("node: %s is finished", n.host.Address)
+ logrus.Infof(n.ShowTaskList())
+}
+
+func doRunTask(n *Node, t task.Task) {
+ start := time.Now()
+ echan := make(chan error)
+ go func(ec chan error) {
+ select {
+ // TODO: maybe we need get timeout from task
+ case <-time.After(time.Second * 300):
+ ec <- fmt.Errorf("timeout to run task")
+ case ec <- t.Run(n.r, n.host):
+ }
+ }(echan)
+
+ err := <-echan
+ finish := time.Now()
+
+ if err != nil {
+ label := fmt.Sprintf("%s: run task: %s on node: %s fail: %v", task.FAILED, t.Name(), n.host.Address, err)
+ t.AddLabel(n.host.Address, label)
+ if task.IsIgnoreError(t) {
+ logrus.Warnf("ignore: %s", label)
+ n.updateNodeStatus("", IgnoreStatus)
+ } else {
+ logrus.Errorf("%s", label)
+ // set task status on node after task
+ n.updateNodeStatus(label, ErrorStatus)
+ }
+ } else {
+ t.AddLabel(n.host.Address, task.SUCCESS)
+ // set task status on node after task
+ n.updateNodeStatus("", FinishStatus)
+ logrus.Infof("run task: %s success on %s\n", t.Name(), n.host.Address)
+ }
+ n.addHistory(t, err, finish.UTC().Sub(start))
}
func NewNode(hcf *api.HostConfig, r runner.Runner) (*Node, error) {
@@ -153,25 +221,7 @@ func NewNode(hcf *api.HostConfig, r runner.Runner) (*Node, error) {
case <-n.stop:
return
case t := <-n.queue:
- // set task status on node before run task
- err := t.Run(n.r, n.host)
- if err != nil {
- label := fmt.Sprintf("%s: run task: %s on node: %s fail: %v", task.FAILED, t.Name(), n.host.Address, err)
- t.AddLabel(n.host.Address, label)
- if task.IsIgnoreError(t) {
- logrus.Warnf("ignore: %s", label)
- n.updateNodeStatus("", IgnoreStatus)
- } else {
- logrus.Errorf("%s", label)
- // set task status on node after task
- n.updateNodeStatus(label, ErrorStatus)
- }
- } else {
- t.AddLabel(n.host.Address, task.SUCCESS)
- // set task status on node after task
- n.updateNodeStatus("", FinishStatus)
- logrus.Infof("run task: %s success on %s\n", t.Name(), n.host.Address)
- }
+ doRunTask(n, t)
}
}
}(n)
diff --git a/pkg/utils/task/task.go b/pkg/utils/task/task.go
index f896d83..8452b01 100644
--- a/pkg/utils/task/task.go
+++ b/pkg/utils/task/task.go
@@ -53,6 +53,16 @@ func NewTaskInstance(t TaskRun) *TaskInstance {
}
}
+func NewTaskIgnoreErrInstance(t TaskRun) *TaskInstance {
+ ti := &TaskInstance{
+ data: make(map[string]string),
+ TaskRun: t,
+ }
+
+ ti.AddLabel(IgnoreErr, "true")
+ return ti
+}
+
func (t *TaskInstance) AddLabel(key, label string) {
t.l.Lock()
defer t.l.Unlock()
--
2.25.1