Sparkローカル提出Volcanoスケジューリングサポート方案

6754 ワード

Volcanoクライアントvcctlの再構築
1.1新規コマンドライン
vcctlにコマンドラインを追加
rootCmd.AddCommand(buildSpark())
rootCmd.AddCommand(buildSparkOperator())

1.2コマンドラインルートおよび命令の作成
func buildSparkOperator() *cobra.Command {

   //    
   sparkOperatorCmd := &cobra.Command{
      Use:   "spark-operator",
      Short: "spark-operator cmd",
   }

   sparkSubmitCmd := &cobra.Command{
      Use:   "spark-operator",
      Short: "spark operator",
      Run: func(cmd *cobra.Command, args []string) {
         checkError(cmd, spark_operator.RunSparkOperatorSubmit())
      },
   }
   //    flag
   spark_operator.InitSparkOperatorFlags(sparkSubmitCmd)
   sparkOperatorCmd.AddCommand(sparkSubmitCmd)

   return sparkOperatorCmd
}

1.3 yamlファイルを構築し、ローカルjarパッケージをファイルサーバにコミットする
func RunSparkOperatorSubmit() error {

   //      
   //  /opt/spark-examples_2.11-2.4.4.jar
   filePathSplit := strings.Split(cf.FilePath, "/")

   cf.FileName = filePathSplit[len(filePathSplit)-1]

   //         
   sf.Spec.MainApplicationFile = "local:///opt/spark/examples/target/scala-2.11/jars/" + cf.FileName

   sf.Spec.Volumes.Volume = []Volume{{Name: cf.VolumeName, HostPath: HostPath{cf.HostPath, cf.HostPathType}}}

   //    
   sf.Spec.Driver.Labels = map[string]string{cf.DriverLabel: cf.DriverLabelValue, "odin.k8s.io/spark": "true", "odin.io/filename": cf.FileName, "odin.registry/addr": "10.180.210.196"}

   sf.Spec.Driver.VolumeMounts.VolumeMount = []VolumeMount{{Name: cf.DriverVolumeMountName, MountPath: cf.DriverVolumeMountPath}}

   sf.Spec.Executor.Labels = map[string]string{cf.ExecutorLabel: cf.ExecutorLabelValue, "odin.k8s.io/spark": "true", "odin.io/filename": cf.FileName, "odin.registry/addr": "10.180.210.196"}

   sf.Spec.Executor.VolumeMounts.VolumeMount = []VolumeMount{{Name: cf.ExecutorVolumeMountName, MountPath: cf.ExecutorVolumeMountPath}}

   //  yaml   
   fs, err := yaml.Marshal(&sf)
   if err != nil {
      println(err.Error())
   }

   //  yaml  
   f, err := os.Create(sf.Metadata.Name + ".yaml")
   if err != nil {
      fmt.Println(err)
   }

   //           api
   rmVolume := regexp.MustCompile("volume:
") rmVolumeMount := regexp.MustCompile("volumeMount:
") yamlString := string(fs) yamlString = rmVolume.ReplaceAllString(yamlString, "") yamlString = rmVolumeMount.ReplaceAllString(yamlString, "") // _, err = f.WriteString(yamlString) if err != nil { fmt.Println(err) f.Close() } // jar uploadFile(cf.FilePath, "http://10.180.210.37:33332/upload") // cmd := exec.Command("/bin/bash", "-c", "kubectl apply -f "+f.Name()) output, err := cmd.Output() if err != nil { return err } fmt.Printf("Execute Shell:%s finished with output:
%s", cmd, string(output)) return err }

yamlファイル構造体の定義は次のとおりです.
type sparkOperatorFlags struct {
   ApiVersion string `yaml:"apiVersion"`
   Kind       string `yaml:"kind"`
   Metadata   struct {
      Name      string `yaml:"name"`
      Namespace string `yaml:"namespace"`
   }
   Spec struct {
      Types               string `yaml:"type"`
      Mode                string `yaml:"mode"`
      Image               string `yaml:"image"`
      ImagePullPolicy     string `yaml:"imagePullPolicy"`
      MainClass           string `yaml:"mainClass"`
      MainApplicationFile string `yaml:"mainApplicationFile"`
      SparkVersion        string `yaml:"sparkVersion"`
      BatchScheduler      string `yaml:"batchScheduler"`
      RestartPolicy       struct {
         Types string `yaml:"type"`
      }
      Volumes struct {
         Volume []Volume `yaml:"volume"`
      }
      Driver struct {
         Cores          int               `yaml:"cores"`
         CoreLimit      string            `yaml:"coreLimit"`
         Memory         string            `yaml:"memory"`
         Labels         map[string]string `yaml:"labels"`
         ServiceAccount string            `yaml:"serviceAccount"`
         VolumeMounts   struct {
            VolumeMount []VolumeMount `yaml:"volumeMount"`
         }
      }
      Executor struct {
         Cores        int               `yaml:"cores"`
         Instances    int               `yaml:"instances"`
         Memory       string            `yaml:"memory"`
         Labels       map[string]string `yaml:"labels"`
         VolumeMounts struct {
            VolumeMount []VolumeMount `yaml:"volumeMount"`
         }
      }
   }
}

type VolumeMount struct {
   Name      string `yaml:"name"`
   MountPath string `yaml:"mountPath"`
}

type Volume struct {
   Name     string   `yaml:"name"`
   HostPath HostPath `yaml:"hostPath"`
}

type HostPath struct {
   Path  string `yaml:"path"`
   Types string `yaml:"type"`
}

1.4 webhookを変更し、volcanoがラベルを含むリクエストをブロックできるようにする
const (
   // DefaultQueue constant stores the name of the queue as "default"
   DefaultQueue = "default"

   defaultSchedulerName = "volcano"

   INIT_CONTAINER_NAME = "spark-init"
   ODIN_FILE_SERVER_ADDR = "10.180.210.37"//"odin-file-server"
   ODIN_FILE_SERVER_PORT = 80
   ODIN_FILE_DOWNLOAD_KEY = "odin.io/filename"
   ODIN_IMAGE_REGISTRY_ADDR_KEY = "odin.registry/addr"
   ODIN_CONFIRM_SPARK_APP_KEY = "odin.k8s.io/spark"
   ODIN_APP_EXEC_PATH="/opt/spark/examples/target/scala-2.11/jars/"
   ODIN_BASE_IMAGE="library/centos-ssh:latest"
)

func init() {
   router.RegisterAdmission(service)
}

//   MutatingWebhookConfiguration  
var service = &router.AdmissionService{
   Path: "/pods/mutate",
   //     
   Func: MutatePods,

   //       
   MutatingConfig: &whv1beta1.MutatingWebhookConfiguration{
      Webhooks: []whv1beta1.MutatingWebhook{{
         Name: "mutatepod.volcano.sh",
         Rules: []whv1beta1.RuleWithOperations{
            {
               Operations: []whv1beta1.OperationType{whv1beta1.Create},
               Rule: whv1beta1.Rule{
                  APIGroups:   []string{""},
                  APIVersions: []string{"v1"},
                  Resources:   []string{"pods"},
               },
            },
         },
      }},
   },
}