From 52f415c3888b9bd0484ba5f66b2e987e41d4f9fb Mon Sep 17 00:00:00 2001 From: haozi007 Date: Mon, 2 Aug 2021 07:24:24 +0100 Subject: [PATCH 8/9] refactor node mananger 1. add timeout of run task; 2. show tasks history on node; Signed-off-by: haozi007 --- .../binary/addons/runner_addons.go | 3 +- pkg/clusterdeployment/binary/binary.go | 7 +- .../binary/cleanupcluster/cleanupetcd.go | 6 +- .../cleanupcluster/cleanuploadbalance.go | 3 +- .../binary/cleanupcluster/cleanupnode.go | 6 +- .../binary/coredns/binarycoredns.go | 3 +- .../binary/coredns/podcoredns.go | 3 +- .../binary/etcdcluster/etcdreconfig.go | 3 +- .../binary/infrastructure/infrastructure.go | 3 +- .../binary/network/network.go | 3 +- pkg/utils/nodemanager/node.go | 90 ++++++++++++++----- pkg/utils/task/task.go | 10 +++ 12 files changed, 93 insertions(+), 47 deletions(-) diff --git a/pkg/clusterdeployment/binary/addons/runner_addons.go b/pkg/clusterdeployment/binary/addons/runner_addons.go index f8c0dad..e5bc0ad 100644 --- a/pkg/clusterdeployment/binary/addons/runner_addons.go +++ b/pkg/clusterdeployment/binary/addons/runner_addons.go @@ -122,7 +122,7 @@ func cleanupAddons(cluster *api.ClusterConfig) error { yamlPath := filepath.Join(cluster.PackageSrc.GetPkgDstPath(), constants.DefaultFilePath) kubeconfig := filepath.Join(cluster.GetConfigDir(), constants.KubeConfigFileNameAdmin) - t := task.NewTaskInstance(&CleanupAddonsTask{ + t := task.NewTaskIgnoreErrInstance(&CleanupAddonsTask{ yaml: yaml, srcPath: yamlPath, kubeconfig: kubeconfig, @@ -134,7 +134,6 @@ func cleanupAddons(cluster *api.ClusterConfig) error { } } - task.SetIgnoreErrorFlag(t) useMaster, err := nodemanager.RunTaskOnOneNode(t, masters) if err != nil { return err diff --git a/pkg/clusterdeployment/binary/binary.go b/pkg/clusterdeployment/binary/binary.go index dd260b0..f901643 100644 --- a/pkg/clusterdeployment/binary/binary.go +++ b/pkg/clusterdeployment/binary/binary.go @@ -102,12 +102,11 @@ func (bcp *BinaryClusterDeployment) registerNodes() error { defer func() { if err != nil { bcp.Finish() - nodemanager.UnRegisterAllNodes() } }() for _, cfg := range bcp.config.Nodes { - err := bcp.registerNode(cfg) + err = bcp.registerNode(cfg) if err != nil { return err } @@ -408,9 +407,7 @@ func (bcp *BinaryClusterDeployment) Finish() { logrus.Info("do finish binary deployment...") bcp.connLock.Lock() defer bcp.connLock.Unlock() - for _, c := range bcp.connections { - c.Close() - } + nodemanager.UnRegisterAllNodes() bcp.connections = make(map[string]runner.Runner) logrus.Info("do finish binary deployment success") } diff --git a/pkg/clusterdeployment/binary/cleanupcluster/cleanupetcd.go b/pkg/clusterdeployment/binary/cleanupcluster/cleanupetcd.go index 81e1fe6..ed79246 100644 --- a/pkg/clusterdeployment/binary/cleanupcluster/cleanupetcd.go +++ b/pkg/clusterdeployment/binary/cleanupcluster/cleanupetcd.go @@ -84,13 +84,12 @@ func CleanupEtcdMember(conf *api.ClusterConfig, hostconfig *api.HostConfig) erro } // cleanup remains - taskCleanupEtcdMember := task.NewTaskInstance( + taskCleanupEtcdMember := task.NewTaskIgnoreErrInstance( &cleanupEtcdMemberTask{ ccfg: conf, }, ) - task.SetIgnoreErrorFlag(taskCleanupEtcdMember) if err := nodemanager.RunTaskOnNodes(taskCleanupEtcdMember, []string{hostconfig.Address}); err != nil { return fmt.Errorf("run task for cleanup etcd member failed: %v", err) } @@ -114,14 +113,13 @@ func CleanupAllEtcds(conf *api.ClusterConfig) error { } // cleanup remains - taskCleanupAllEtcds := task.NewTaskInstance( + taskCleanupAllEtcds := task.NewTaskIgnoreErrInstance( &cleanupEtcdMemberTask{ ccfg: conf, }, ) nodes := utils.GetAllIPs(conf.EtcdCluster.Nodes) - task.SetIgnoreErrorFlag(taskCleanupAllEtcds) if err := nodemanager.RunTaskOnNodes(taskCleanupAllEtcds, nodes); err != nil { return fmt.Errorf("run task for cleanup all etcds failed: %v", err) } diff --git a/pkg/clusterdeployment/binary/cleanupcluster/cleanuploadbalance.go b/pkg/clusterdeployment/binary/cleanupcluster/cleanuploadbalance.go index 010ef84..d836f99 100644 --- a/pkg/clusterdeployment/binary/cleanupcluster/cleanuploadbalance.go +++ b/pkg/clusterdeployment/binary/cleanupcluster/cleanuploadbalance.go @@ -56,13 +56,12 @@ func (t *cleanupLoadBalanceTask) Run(r runner.Runner, hostConfig *api.HostConfig } func CleanupLoadBalance(conf *api.ClusterConfig, lb *api.HostConfig) error { - taskCleanupLoadBalance := task.NewTaskInstance( + taskCleanupLoadBalance := task.NewTaskIgnoreErrInstance( &cleanupLoadBalanceTask{ ccfg: conf, }, ) - task.SetIgnoreErrorFlag(taskCleanupLoadBalance) if err := nodemanager.RunTaskOnNodes(taskCleanupLoadBalance, []string{lb.Address}); err != nil { return fmt.Errorf("run task for cleanup loadbalance failed: %v", err) } diff --git a/pkg/clusterdeployment/binary/cleanupcluster/cleanupnode.go b/pkg/clusterdeployment/binary/cleanupcluster/cleanupnode.go index da0488a..9d2d009 100644 --- a/pkg/clusterdeployment/binary/cleanupcluster/cleanupnode.go +++ b/pkg/clusterdeployment/binary/cleanupcluster/cleanupnode.go @@ -218,7 +218,7 @@ func (t *removeWorkerTask) Run(r runner.Runner, hostConfig *api.HostConfig) erro } func execRemoveWorkerTask(conf *api.ClusterConfig, hostconfig *api.HostConfig) error { - taskRemoveWorker := task.NewTaskInstance( + taskRemoveWorker := task.NewTaskIgnoreErrInstance( &removeWorkerTask{ ccfg: conf, workerName: hostconfig.Name, @@ -230,7 +230,6 @@ func execRemoveWorkerTask(conf *api.ClusterConfig, hostconfig *api.HostConfig) e return fmt.Errorf("failed to get first master") } - task.SetIgnoreErrorFlag(taskRemoveWorker) if err := nodemanager.RunTaskOnNodes(taskRemoveWorker, []string{master}); err != nil { return err } @@ -249,14 +248,13 @@ func CleanupNode(conf *api.ClusterConfig, hostconfig *api.HostConfig, delType ui } } - taskCleanupNode := task.NewTaskInstance( + taskCleanupNode := task.NewTaskIgnoreErrInstance( &cleanupNodeTask{ ccfg: conf, delType: delType, }, ) - task.SetIgnoreErrorFlag(taskCleanupNode) if err := nodemanager.RunTaskOnNodes(taskCleanupNode, []string{hostconfig.Address}); err != nil { return fmt.Errorf("run task for cleanup cluster failed: %v", err) } diff --git a/pkg/clusterdeployment/binary/coredns/binarycoredns.go b/pkg/clusterdeployment/binary/coredns/binarycoredns.go index db9bcd5..411dba2 100644 --- a/pkg/clusterdeployment/binary/coredns/binarycoredns.go +++ b/pkg/clusterdeployment/binary/coredns/binarycoredns.go @@ -362,14 +362,13 @@ func (bc *BinaryCoredns) Cleanup(cluster *api.ClusterConfig) error { logrus.Warn("no master host found, can not cleanup coredns service") return nil } - sst := task.NewTaskInstance( + sst := task.NewTaskIgnoreErrInstance( &BinaryCorednsCleanupTask{ Cluster: cluster, cleanYaml: true, }, ) - task.SetIgnoreErrorFlag(sst) err := nodemanager.RunTaskOnNodes(sst, masterIPs) if err != nil { logrus.Warnf("run cleanup coredns task failed: %v", err) diff --git a/pkg/clusterdeployment/binary/coredns/podcoredns.go b/pkg/clusterdeployment/binary/coredns/podcoredns.go index c27f1d1..28ae908 100644 --- a/pkg/clusterdeployment/binary/coredns/podcoredns.go +++ b/pkg/clusterdeployment/binary/coredns/podcoredns.go @@ -150,7 +150,7 @@ func (pc *PodCoredns) Cleanup(cluster *api.ClusterConfig) error { if cluster == nil { return fmt.Errorf("invalid cluster config") } - t := task.NewTaskInstance(&PodCorednsCleanupTask{Cluster: cluster}) + t := task.NewTaskIgnoreErrInstance(&PodCorednsCleanupTask{Cluster: cluster}) var masters []string for _, n := range cluster.Nodes { if (n.Type & api.Master) != 0 { @@ -158,7 +158,6 @@ func (pc *PodCoredns) Cleanup(cluster *api.ClusterConfig) error { } } - task.SetIgnoreErrorFlag(t) useMaster, err := nodemanager.RunTaskOnOneNode(t, masters) if err != nil { return err diff --git a/pkg/clusterdeployment/binary/etcdcluster/etcdreconfig.go b/pkg/clusterdeployment/binary/etcdcluster/etcdreconfig.go index b417861..aadc4a8 100644 --- a/pkg/clusterdeployment/binary/etcdcluster/etcdreconfig.go +++ b/pkg/clusterdeployment/binary/etcdcluster/etcdreconfig.go @@ -196,13 +196,12 @@ func (t *removeEtcdsTask) Run(r runner.Runner, hostConfig *api.HostConfig) error } func execRemoveEtcdsTask(conf *api.ClusterConfig, node string) error { - taskRemoveEtcds := task.NewTaskInstance( + taskRemoveEtcds := task.NewTaskIgnoreErrInstance( &removeEtcdsTask{ ccfg: conf, }, ) - task.SetIgnoreErrorFlag(taskRemoveEtcds) if err := nodemanager.RunTaskOnNodes(taskRemoveEtcds, []string{node}); err != nil { logrus.Errorf("run task for remove etcds failed: %v", err) return err diff --git a/pkg/clusterdeployment/binary/infrastructure/infrastructure.go b/pkg/clusterdeployment/binary/infrastructure/infrastructure.go index bf6ed3a..407c4a8 100644 --- a/pkg/clusterdeployment/binary/infrastructure/infrastructure.go +++ b/pkg/clusterdeployment/binary/infrastructure/infrastructure.go @@ -419,13 +419,12 @@ func NodeInfrastructureDestroy(config *api.ClusterConfig, hostconfig *api.HostCo return fmt.Errorf("do not register %d roleinfra", hostconfig.Type) } - itask := task.NewTaskInstance( + itask := task.NewTaskIgnoreErrInstance( &DestroyInfraTask{ packageSrc: &config.PackageSrc, roleInfra: roleInfra, }) - task.SetIgnoreErrorFlag(itask) if err := nodemanager.RunTaskOnNodes(itask, []string{hostconfig.Address}); err != nil { return fmt.Errorf("destroy infrastructure Task failed: %v", err) } diff --git a/pkg/clusterdeployment/binary/network/network.go b/pkg/clusterdeployment/binary/network/network.go index e1cd73f..0c35a9c 100644 --- a/pkg/clusterdeployment/binary/network/network.go +++ b/pkg/clusterdeployment/binary/network/network.go @@ -121,7 +121,7 @@ func CleanupNetwork(cluster *api.ClusterConfig) error { if cluster == nil { return fmt.Errorf("invalid cluster config") } - t := task.NewTaskInstance(&CleanupNetworkTask{Cluster: cluster}) + t := task.NewTaskIgnoreErrInstance(&CleanupNetworkTask{Cluster: cluster}) var masters []string for _, n := range cluster.Nodes { if (n.Type & api.Master) != 0 { @@ -129,7 +129,6 @@ func CleanupNetwork(cluster *api.ClusterConfig) error { } } - task.SetIgnoreErrorFlag(t) useMaster, err := nodemanager.RunTaskOnOneNode(t, masters) if err != nil { return err diff --git a/pkg/utils/nodemanager/node.go b/pkg/utils/nodemanager/node.go index a19d2e7..08af098 100644 --- a/pkg/utils/nodemanager/node.go +++ b/pkg/utils/nodemanager/node.go @@ -17,6 +17,7 @@ package nodemanager import ( "fmt" + "strings" "sync" "time" @@ -54,6 +55,12 @@ func (ns NodeStatus) ShowCounts() string { return fmt.Sprintf("{ total: %d, success: %d, fail: %d, ignore: %d }", ns.TaskTotalCnt, ns.TaskSuccessCnt, ns.TaskFailCnt, ns.TaskIgnoreCnt) } +type taskSummary struct { + name string + useTime time.Duration + status string +} + type Node struct { host *api.HostConfig r runner.Runner @@ -62,6 +69,32 @@ type Node struct { queue chan task.Task lock sync.RWMutex status NodeStatus + + tasksHistory []taskSummary +} + +func (n *Node) addHistory(t task.Task, err error, useTime time.Duration) { + ts := taskSummary{name: t.Name(), useTime: useTime} + if err == nil { + ts.status = "success" + } else { + if task.IsIgnoreError(t) { + ts.status = fmt.Sprintf("ignore err: %v", err) + } else { + ts.status = err.Error() + } + } + n.tasksHistory = append(n.tasksHistory, ts) +} + +func (n *Node) ShowTaskList() string { + var sb strings.Builder + sb.WriteString(fmt.Sprintf("\n##################tasks on node: %s#################\n", n.host.Name)) + for _, n := range n.tasksHistory { + sb.WriteString(fmt.Sprintf("name: %s, elapsed time: %s, message: %s\n", n.name, n.useTime.String(), n.status)) + } + sb.WriteString("#########################################\n") + return sb.String() } func (n *Node) GetStatus() NodeStatus { @@ -135,7 +168,42 @@ func (n *Node) PushTask(t task.Task) bool { func (n *Node) Finish() { n.stop <- true n.r.Close() - logrus.Infof("node: %s is finished", n.host.Address) + logrus.Infof(n.ShowTaskList()) +} + +func doRunTask(n *Node, t task.Task) { + start := time.Now() + echan := make(chan error) + go func(ec chan error) { + select { + // TODO: maybe we need get timeout from task + case <-time.After(time.Second * 300): + ec <- fmt.Errorf("timeout to run task") + case ec <- t.Run(n.r, n.host): + } + }(echan) + + err := <-echan + finish := time.Now() + + if err != nil { + label := fmt.Sprintf("%s: run task: %s on node: %s fail: %v", task.FAILED, t.Name(), n.host.Address, err) + t.AddLabel(n.host.Address, label) + if task.IsIgnoreError(t) { + logrus.Warnf("ignore: %s", label) + n.updateNodeStatus("", IgnoreStatus) + } else { + logrus.Errorf("%s", label) + // set task status on node after task + n.updateNodeStatus(label, ErrorStatus) + } + } else { + t.AddLabel(n.host.Address, task.SUCCESS) + // set task status on node after task + n.updateNodeStatus("", FinishStatus) + logrus.Infof("run task: %s success on %s\n", t.Name(), n.host.Address) + } + n.addHistory(t, err, finish.UTC().Sub(start)) } func NewNode(hcf *api.HostConfig, r runner.Runner) (*Node, error) { @@ -153,25 +221,7 @@ func NewNode(hcf *api.HostConfig, r runner.Runner) (*Node, error) { case <-n.stop: return case t := <-n.queue: - // set task status on node before run task - err := t.Run(n.r, n.host) - if err != nil { - label := fmt.Sprintf("%s: run task: %s on node: %s fail: %v", task.FAILED, t.Name(), n.host.Address, err) - t.AddLabel(n.host.Address, label) - if task.IsIgnoreError(t) { - logrus.Warnf("ignore: %s", label) - n.updateNodeStatus("", IgnoreStatus) - } else { - logrus.Errorf("%s", label) - // set task status on node after task - n.updateNodeStatus(label, ErrorStatus) - } - } else { - t.AddLabel(n.host.Address, task.SUCCESS) - // set task status on node after task - n.updateNodeStatus("", FinishStatus) - logrus.Infof("run task: %s success on %s\n", t.Name(), n.host.Address) - } + doRunTask(n, t) } } }(n) diff --git a/pkg/utils/task/task.go b/pkg/utils/task/task.go index f896d83..8452b01 100644 --- a/pkg/utils/task/task.go +++ b/pkg/utils/task/task.go @@ -53,6 +53,16 @@ func NewTaskInstance(t TaskRun) *TaskInstance { } } +func NewTaskIgnoreErrInstance(t TaskRun) *TaskInstance { + ti := &TaskInstance{ + data: make(map[string]string), + TaskRun: t, + } + + ti.AddLabel(IgnoreErr, "true") + return ti +} + func (t *TaskInstance) AddLabel(key, label string) { t.l.Lock() defer t.l.Unlock() -- 2.25.1