469 lines
15 KiB
Diff
469 lines
15 KiB
Diff
From f1d2766bda44a878777ea266573236271dcc65ed Mon Sep 17 00:00:00 2001
|
||
From: haozi007 <liuhao27@huawei.com>
|
||
Date: Thu, 11 Nov 2021 11:15:37 +0000
|
||
Subject: [PATCH 11/12] run hook with envs of cluster info
|
||
|
||
Signed-off-by: haozi007 <liuhao27@huawei.com>
|
||
---
|
||
docs/hooks_of_eggo.md | 2 +-
|
||
pkg/api/tools.go | 29 ++++++++
|
||
pkg/api/types.go | 42 ++++++++++++
|
||
.../binary/infrastructure/infrastructure.go | 10 +--
|
||
pkg/utils/dependency/dependency.go | 40 ++++++++---
|
||
pkg/utils/dependency/dependency_test.go | 52 +++++++++++++++
|
||
pkg/utils/dependency/install.go | 66 +++++++++++++++----
|
||
pkg/utils/runner/runner.go | 16 ++---
|
||
8 files changed, 216 insertions(+), 41 deletions(-)
|
||
create mode 100644 pkg/utils/dependency/dependency_test.go
|
||
|
||
diff --git a/docs/hooks_of_eggo.md b/docs/hooks_of_eggo.md
|
||
index 92b9a38..b1f09cb 100644
|
||
--- a/docs/hooks_of_eggo.md
|
||
+++ b/docs/hooks_of_eggo.md
|
||
@@ -22,6 +22,7 @@
|
||
|
||
- 脚本目录下的所有脚本都会被执行,而子目录中的脚本不会被执行;
|
||
- 每个脚本的超时时间为60s;
|
||
+- role可以为master,worker,etcd或者loadbalance;
|
||
|
||
### 配置文件参数方式
|
||
|
||
@@ -60,4 +61,3 @@ eggo会在hook执行时,通过环境变量传递部分信息,用于脚本执
|
||
| EGGO_NODE_ROLE | hook执行的节点角色 |
|
||
| EGGO_HOOK_TYPE | hook的类型,prehook或者posthook |
|
||
| EGGO_OPERATOR | 当前的操作,deploy,cleanup,join,delete。 |
|
||
-
|
||
diff --git a/pkg/api/tools.go b/pkg/api/tools.go
|
||
index 861d70a..4c65dc2 100644
|
||
--- a/pkg/api/tools.go
|
||
+++ b/pkg/api/tools.go
|
||
@@ -55,6 +55,10 @@ func (p PackageSrcConfig) GetPkgDstPath() string {
|
||
return p.DstPath
|
||
}
|
||
|
||
+func (ep APIEndpoint) GetUrl() string {
|
||
+ return fmt.Sprintf("%s/%v", ep.AdvertiseAddress, ep.BindPort)
|
||
+}
|
||
+
|
||
func GetClusterHomePath(cluster string) string {
|
||
return filepath.Join(EggoHomePath, cluster)
|
||
}
|
||
@@ -191,3 +195,28 @@ func ParseScheduleType(schedule string) (ScheduleType, error) {
|
||
return SchedulePreJoin, fmt.Errorf("invalid schedule type: %s", schedule)
|
||
}
|
||
}
|
||
+
|
||
+func GetRoleString(roles uint16) []string {
|
||
+ var roleStrs []string
|
||
+ if roles&Master != 0 {
|
||
+ roleStrs = append(roleStrs, "master")
|
||
+ }
|
||
+ if roles&Worker != 0 {
|
||
+ roleStrs = append(roleStrs, "worker")
|
||
+ }
|
||
+ if roles&ETCD != 0 {
|
||
+ roleStrs = append(roleStrs, "etcd")
|
||
+ }
|
||
+ if roles&LoadBalance != 0 {
|
||
+ roleStrs = append(roleStrs, "loadbalance")
|
||
+ }
|
||
+
|
||
+ return roleStrs
|
||
+}
|
||
+
|
||
+func GetUserTempDir(user string) string {
|
||
+ if user == "root" {
|
||
+ return constants.DefaultRootCopyTempDirHome
|
||
+ }
|
||
+ return fmt.Sprintf(constants.DefaultUserCopyTempHomeFormat, user)
|
||
+}
|
||
diff --git a/pkg/api/types.go b/pkg/api/types.go
|
||
index 6a1351e..fea3f26 100644
|
||
--- a/pkg/api/types.go
|
||
+++ b/pkg/api/types.go
|
||
@@ -35,6 +35,37 @@ const (
|
||
SchedulePostCleanup ScheduleType = "postcleanup"
|
||
)
|
||
|
||
+type HookOperator string
|
||
+
|
||
+const (
|
||
+ HookOpDeploy HookOperator = "deploy"
|
||
+ HookOpCleanup HookOperator = "cleanup"
|
||
+ HookOpJoin HookOperator = "join"
|
||
+ HookOpDelete HookOperator = "delete"
|
||
+)
|
||
+
|
||
+type HookType string
|
||
+
|
||
+const (
|
||
+ PreHookType HookType = "prehook"
|
||
+ PostHookType HookType = "posthook"
|
||
+)
|
||
+
|
||
+type HookRunConfig struct {
|
||
+ ClusterID string
|
||
+ ClusterApiEndpoint string
|
||
+ ClusterConfigDir string
|
||
+
|
||
+ HookType HookType
|
||
+ Operator HookOperator
|
||
+
|
||
+ Node *HostConfig
|
||
+ Scheduler ScheduleType
|
||
+
|
||
+ HookDir string
|
||
+ Hooks []*PackageConfig
|
||
+}
|
||
+
|
||
type RoleInfra struct {
|
||
OpenPorts []*OpenPorts `json:"open-ports"`
|
||
Softwares []*PackageConfig `json:"softwares"`
|
||
@@ -201,6 +232,14 @@ type AddonConfig struct {
|
||
Filename string `json:"filename"`
|
||
}
|
||
|
||
+type ClusterHookConf struct {
|
||
+ Type HookType
|
||
+ Operator HookOperator
|
||
+ Target uint16
|
||
+ HookDir string
|
||
+ HookFiles []string
|
||
+}
|
||
+
|
||
type ClusterConfig struct {
|
||
Name string `json:"name"`
|
||
DeployDriver string `json:"deploy-driver"` // default is binary
|
||
@@ -218,6 +257,9 @@ type ClusterConfig struct {
|
||
WorkerConfig WorkerConfig `json:"workerconfig"`
|
||
RoleInfra map[uint16]*RoleInfra `json:"role-infra"`
|
||
|
||
+ // do not encode hooks, just set before use it
|
||
+ HooksConf *ClusterHookConf `json:"-"`
|
||
+
|
||
// TODO: add other configurations at here
|
||
}
|
||
|
||
diff --git a/pkg/clusterdeployment/binary/infrastructure/infrastructure.go b/pkg/clusterdeployment/binary/infrastructure/infrastructure.go
|
||
index 634e338..68faf36 100644
|
||
--- a/pkg/clusterdeployment/binary/infrastructure/infrastructure.go
|
||
+++ b/pkg/clusterdeployment/binary/infrastructure/infrastructure.go
|
||
@@ -27,7 +27,6 @@ import (
|
||
|
||
"isula.org/eggo/pkg/api"
|
||
"isula.org/eggo/pkg/clusterdeployment/binary/cleanupcluster"
|
||
- "isula.org/eggo/pkg/constants"
|
||
"isula.org/eggo/pkg/utils"
|
||
"isula.org/eggo/pkg/utils/dependency"
|
||
"isula.org/eggo/pkg/utils/nodemanager"
|
||
@@ -321,13 +320,6 @@ func (it *DestroyInfraTask) Name() string {
|
||
return "DestroyInfraTask"
|
||
}
|
||
|
||
-func getCopyDefaultDir(user string) string {
|
||
- if user == "root" {
|
||
- return constants.DefaultRootCopyTempDirHome
|
||
- }
|
||
- return fmt.Sprintf(constants.DefaultUserCopyTempHomeFormat, user)
|
||
-}
|
||
-
|
||
func (it *DestroyInfraTask) Run(r runner.Runner, hcg *api.HostConfig) error {
|
||
if hcg == nil {
|
||
return fmt.Errorf("empty host config")
|
||
@@ -348,7 +340,7 @@ func (it *DestroyInfraTask) Run(r runner.Runner, hcg *api.HostConfig) error {
|
||
logrus.Errorf("path %s not in White List and cannot remove", dstDir)
|
||
return nil
|
||
}
|
||
- copyTempDir := getCopyDefaultDir(hcg.UserName)
|
||
+ copyTempDir := api.GetUserTempDir(hcg.UserName)
|
||
if _, err := r.RunCommand(fmt.Sprintf("sudo -E /bin/sh -c \"rm -rf %s %s %s\"", dstDir, copyTempDir, it.k8sConfigDir)); err != nil {
|
||
return fmt.Errorf("rm dependency failed: %v", err)
|
||
}
|
||
diff --git a/pkg/utils/dependency/dependency.go b/pkg/utils/dependency/dependency.go
|
||
index 9e6ac22..2c5dc26 100644
|
||
--- a/pkg/utils/dependency/dependency.go
|
||
+++ b/pkg/utils/dependency/dependency.go
|
||
@@ -293,6 +293,7 @@ func (dy *dependencyYaml) Remove(r runner.Runner) error {
|
||
}
|
||
|
||
type dependencyShell struct {
|
||
+ envs []string
|
||
srcPath string
|
||
shell []*api.PackageConfig
|
||
}
|
||
@@ -305,22 +306,45 @@ func NewDependencyShell(srcPath string, shell []*api.PackageConfig) *dependencyS
|
||
}
|
||
|
||
func (ds *dependencyShell) Install(r runner.Runner) error {
|
||
- var sb strings.Builder
|
||
+ shellTemplate := `
|
||
+#!/bin/bash
|
||
+{{- range $i, $v := .Envs }}
|
||
+export {{ $v }}
|
||
+{{- end }}
|
||
|
||
- sb.WriteString("sudo -E /bin/sh -c \"")
|
||
- for _, s := range ds.shell {
|
||
- sb.WriteString(fmt.Sprintf("chmod +x %s/%s && ", ds.srcPath, s.Name))
|
||
+{{- $tout := .Timeouts }}
|
||
+{{- range $i, $v := .Shells }}
|
||
+chmod +x {{ $v }} && timeout -s SIGKILL {{index $tout $i}} {{ $v }} > /dev/null
|
||
+if [ $? -ne 0 ]; then
|
||
+ echo "run {{ $v }} failed"
|
||
+ exit 1
|
||
+fi
|
||
+{{- end }}
|
||
|
||
+exit 0
|
||
+`
|
||
+ datastore := map[string]interface{}{}
|
||
+ datastore["Envs"] = ds.envs
|
||
+ var shells []string
|
||
+ var timeouts []string
|
||
+ for _, s := range ds.shell {
|
||
+ shells = append(shells, fmt.Sprintf("%s/%s", ds.srcPath, s.Name))
|
||
timeout := s.TimeOut
|
||
if timeout == "" {
|
||
timeout = "30s"
|
||
}
|
||
- sb.WriteString(fmt.Sprintf("timeout -s SIGKILL %s %s/%s > /dev/null ; ", timeout, ds.srcPath, s.Name))
|
||
+ timeouts = append(timeouts, timeout)
|
||
}
|
||
- sb.WriteString("\"")
|
||
+ datastore["Shells"] = shells
|
||
+ datastore["Timeouts"] = timeouts
|
||
|
||
- if _, err := r.RunCommand(sb.String()); err != nil {
|
||
- return fmt.Errorf("shell execute failed: %v", err)
|
||
+ parsedShell, err := template.TemplateRender(shellTemplate, datastore)
|
||
+ if err != nil {
|
||
+ return err
|
||
+ }
|
||
+
|
||
+ if _, err := r.RunShell(parsedShell, "exechook"); err != nil {
|
||
+ return fmt.Errorf("hook execute failed: %v", err)
|
||
}
|
||
|
||
return nil
|
||
diff --git a/pkg/utils/dependency/dependency_test.go b/pkg/utils/dependency/dependency_test.go
|
||
new file mode 100644
|
||
index 0000000..58ea756
|
||
--- /dev/null
|
||
+++ b/pkg/utils/dependency/dependency_test.go
|
||
@@ -0,0 +1,52 @@
|
||
+package dependency
|
||
+
|
||
+import (
|
||
+ "testing"
|
||
+
|
||
+ "github.com/sirupsen/logrus"
|
||
+ "isula.org/eggo/pkg/api"
|
||
+)
|
||
+
|
||
+type MockRunner struct {
|
||
+}
|
||
+
|
||
+func (m *MockRunner) Copy(src, dst string) error {
|
||
+ logrus.Infof("copy %s to %s", src, dst)
|
||
+ return nil
|
||
+}
|
||
+
|
||
+func (m *MockRunner) RunCommand(cmd string) (string, error) {
|
||
+ logrus.Infof("run command: %s", cmd)
|
||
+ return "", nil
|
||
+}
|
||
+
|
||
+func (m *MockRunner) RunShell(shell string, name string) (string, error) {
|
||
+ logrus.Infof("run shell: %s, name: %s", shell, name)
|
||
+ return "", nil
|
||
+}
|
||
+
|
||
+func (m *MockRunner) Reconnect() error {
|
||
+ logrus.Infof("reconnect")
|
||
+ return nil
|
||
+}
|
||
+
|
||
+func (m *MockRunner) Close() {
|
||
+ logrus.Infof("close")
|
||
+}
|
||
+
|
||
+func TestNewDependencyShell(t *testing.T) {
|
||
+ var mr MockRunner
|
||
+
|
||
+ shell := &api.PackageConfig{
|
||
+ Name: "test.sh",
|
||
+ Type: "shell",
|
||
+ Dst: "/root",
|
||
+ Schedule: api.SchedulePreJoin,
|
||
+ TimeOut: "30s",
|
||
+ }
|
||
+
|
||
+ dp := NewDependencyShell("/tmp", []*api.PackageConfig{shell})
|
||
+ if err := dp.Install(&mr); err != nil {
|
||
+ t.Fatalf("run test failed: %v", err)
|
||
+ }
|
||
+}
|
||
diff --git a/pkg/utils/dependency/install.go b/pkg/utils/dependency/install.go
|
||
index 8687602..8cb80f6 100644
|
||
--- a/pkg/utils/dependency/install.go
|
||
+++ b/pkg/utils/dependency/install.go
|
||
@@ -349,33 +349,77 @@ func getShell(roleInfra *api.RoleInfra, schedule api.ScheduleType) []*api.Packag
|
||
return shell
|
||
}
|
||
|
||
-func ExecuteShell(roleInfra *api.RoleInfra, packagePath string, hcf *api.HostConfig, schedule api.ScheduleType) error {
|
||
- shell := getShell(roleInfra, schedule)
|
||
- if len(shell) == 0 {
|
||
+func ExecuteHooks(hookConf *api.HookRunConfig) error {
|
||
+ if hookConf == nil || len(hookConf.Hooks) == 0 {
|
||
return nil
|
||
}
|
||
- logrus.Debugf("run %s shell %v on %v\n", string(schedule), shell, hcf.Address)
|
||
|
||
- dp := &dependencyShell{
|
||
- srcPath: path.Join(packagePath, constants.DefaultFilePath),
|
||
- shell: shell,
|
||
+ var hookStr []string
|
||
+ for _, h := range hookConf.Hooks {
|
||
+ hookStr = append(hookStr, h.Name)
|
||
}
|
||
+ logrus.Debugf("run %s shell %v on %v\n", string(hookConf.Scheduler), hookStr, hookConf.Node.Address)
|
||
+
|
||
+ dp := &dependencyShell{
|
||
+ srcPath: hookConf.HookDir,
|
||
+ shell: hookConf.Hooks,
|
||
+ }
|
||
+ envs := make([]string, 9)
|
||
+ envs[0] = fmt.Sprintf("EGGO_CLUSTER_ID=%s", hookConf.ClusterID)
|
||
+ envs[1] = fmt.Sprintf("EGGO_CLUSTER_API_ENDPOINT=%s", hookConf.ClusterApiEndpoint)
|
||
+ envs[2] = fmt.Sprintf("EGGO_CLUSTER_CONFIG_DIR=%s", hookConf.ClusterConfigDir)
|
||
+ envs[3] = fmt.Sprintf("EGGO_NODE_IP=%s", hookConf.Node.Address)
|
||
+ envs[4] = fmt.Sprintf("EGGO_NODE_NAME=%s", hookConf.Node.Name)
|
||
+ envs[5] = fmt.Sprintf("EGGO_NODE_ARCH=%s", hookConf.Node.Arch)
|
||
+ envs[6] = fmt.Sprintf("EGGO_NODE_ROLE=%s", strings.Join(api.GetRoleString(hookConf.Node.Type), ","))
|
||
+ envs[7] = fmt.Sprintf("EGGO_HOOK_TYPE=%s", hookConf.HookType)
|
||
+ envs[8] = fmt.Sprintf("EGGO_OPERATOR=%s", hookConf.Operator)
|
||
+ dp.envs = envs
|
||
|
||
dependencyTask := task.NewTaskInstance(&DependencyTask{
|
||
dp: dp,
|
||
})
|
||
|
||
- if api.IsCleanupSchedule(schedule) {
|
||
+ if api.IsCleanupSchedule(hookConf.Scheduler) {
|
||
task.SetIgnoreErrorFlag(dependencyTask)
|
||
}
|
||
- if err := nodemanager.RunTaskOnNodes(dependencyTask, []string{hcf.Address}); err != nil {
|
||
- logrus.Errorf("Hook %s failed for %s: %v", string(api.SchedulePreJoin), hcf.Address, err)
|
||
+ if err := nodemanager.RunTaskOnNodes(dependencyTask, []string{hookConf.Node.Address}); err != nil {
|
||
+ logrus.Errorf("Hook %s failed for %s: %v", string(api.SchedulePreJoin), hookConf.Node.Address, err)
|
||
return err
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
+func executeShell(ccfg *api.ClusterConfig, role uint16, hcf *api.HostConfig, schedule api.ScheduleType) error {
|
||
+ shell := getShell(ccfg.RoleInfra[role], schedule)
|
||
+ if len(shell) == 0 {
|
||
+ return nil
|
||
+ }
|
||
+
|
||
+ htype := api.PreHookType
|
||
+ if strings.HasPrefix(string(schedule), "post") {
|
||
+ htype = api.PostHookType
|
||
+ }
|
||
+ oper := api.HookOpJoin
|
||
+ if strings.HasSuffix(string(schedule), "cleanup") {
|
||
+ oper = api.HookOpCleanup
|
||
+ }
|
||
+
|
||
+ hookConf := &api.HookRunConfig{
|
||
+ ClusterID: ccfg.Name,
|
||
+ ClusterApiEndpoint: ccfg.APIEndpoint.GetUrl(),
|
||
+ ClusterConfigDir: ccfg.ConfigDir,
|
||
+ HookType: htype,
|
||
+ Operator: oper,
|
||
+ Node: hcf,
|
||
+ HookDir: path.Join(ccfg.PackageSrc.GetPkgDstPath(), constants.DefaultFilePath),
|
||
+ Hooks: shell,
|
||
+ }
|
||
+
|
||
+ return ExecuteHooks(hookConf)
|
||
+}
|
||
+
|
||
func HookSchedule(ccfg *api.ClusterConfig, nodes []*api.HostConfig, role []uint16, schedule api.ScheduleType) error {
|
||
for _, n := range nodes {
|
||
for _, r := range role {
|
||
@@ -383,7 +427,7 @@ func HookSchedule(ccfg *api.ClusterConfig, nodes []*api.HostConfig, role []uint1
|
||
continue
|
||
}
|
||
|
||
- if err := ExecuteShell(ccfg.RoleInfra[r], ccfg.PackageSrc.GetPkgDstPath(), n, schedule); err != nil {
|
||
+ if err := executeShell(ccfg, r, n, schedule); err != nil {
|
||
if api.IsCleanupSchedule(schedule) {
|
||
logrus.Errorf("execute shell failed for %s at %s: %v", n.Address, string(schedule), err)
|
||
} else {
|
||
diff --git a/pkg/utils/runner/runner.go b/pkg/utils/runner/runner.go
|
||
index c7088df..83a81e9 100644
|
||
--- a/pkg/utils/runner/runner.go
|
||
+++ b/pkg/utils/runner/runner.go
|
||
@@ -30,7 +30,6 @@ import (
|
||
"github.com/kubesphere/kubekey/pkg/util/ssh"
|
||
"github.com/sirupsen/logrus"
|
||
"isula.org/eggo/pkg/api"
|
||
- "isula.org/eggo/pkg/constants"
|
||
)
|
||
|
||
const (
|
||
@@ -164,7 +163,7 @@ func (ssh *SSHRunner) Reconnect() error {
|
||
func clearUserTempDir(conn ssh.Connection, host *kkv1alpha1.HostCfg) {
|
||
tmpShell := "/tmp/" + RunnerShellPrefix + "*"
|
||
// scp to tmp file
|
||
- dir := getCopyDefaultDir(host.User)
|
||
+ dir := api.GetUserTempDir(host.User)
|
||
_, err := conn.Exec(fmt.Sprintf("sudo -E /bin/sh -c \"rm -rf %s; rm -rf %s\"", dir, tmpShell), host)
|
||
if err != nil {
|
||
logrus.Warnf("[%s] remove temp dir: %s failed: %v", host.Name, dir, err)
|
||
@@ -175,7 +174,7 @@ func clearUserTempDir(conn ssh.Connection, host *kkv1alpha1.HostCfg) {
|
||
|
||
func prepareUserTempDir(conn ssh.Connection, host *kkv1alpha1.HostCfg) error {
|
||
// scp to tmp file
|
||
- dir := getCopyDefaultDir(host.User)
|
||
+ dir := api.GetUserTempDir(host.User)
|
||
var sb strings.Builder
|
||
sb.WriteString("sudo -E /bin/sh -c \"")
|
||
sb.WriteString(fmt.Sprintf("mkdir -p %s", dir))
|
||
@@ -191,18 +190,11 @@ func prepareUserTempDir(conn ssh.Connection, host *kkv1alpha1.HostCfg) error {
|
||
return nil
|
||
}
|
||
|
||
-func getCopyDefaultDir(user string) string {
|
||
- if user == "root" {
|
||
- return constants.DefaultRootCopyTempDirHome + "/temp"
|
||
- }
|
||
- return fmt.Sprintf(constants.DefaultUserCopyTempHomeFormat, user) + "/temp"
|
||
-}
|
||
-
|
||
func (ssh *SSHRunner) copyFile(src, dst string) error {
|
||
if ssh.Conn == nil {
|
||
return fmt.Errorf("[%s] SSH runner is not connected", ssh.Host.Name)
|
||
}
|
||
- tempDir := getCopyDefaultDir(ssh.Host.User)
|
||
+ tempDir := api.GetUserTempDir(ssh.Host.User)
|
||
// scp to tmp file
|
||
tempCpyFile := filepath.Join(tempDir, filepath.Base(src))
|
||
err := ssh.Conn.Scp(src, tempCpyFile)
|
||
@@ -248,7 +240,7 @@ func (ssh *SSHRunner) copyDir(srcDir, dstDir string) error {
|
||
logrus.Errorf("[%s] create cert tmp tar failed: %v", ssh.Host.Name, err)
|
||
return err
|
||
}
|
||
- tmpCpyDir := getCopyDefaultDir(ssh.Host.User)
|
||
+ tmpCpyDir := api.GetUserTempDir(ssh.Host.User)
|
||
tmpPkiFile := filepath.Join(tmpCpyDir, "pkg.tar")
|
||
// scp to user home directory
|
||
err = ssh.Copy(tmpPkgFile, tmpPkiFile)
|
||
--
|
||
2.25.1
|
||
|