Sparkローカル提出Volcanoスケジューリングサポート方案
6754 ワード
Volcanoクライアントvcctlの再構築
1.1新規コマンドライン
vcctlにコマンドラインを追加
1.2コマンドラインルートおよび命令の作成
1.3 yamlファイルを構築し、ローカルjarパッケージをファイルサーバにコミットする
yamlファイル構造体の定義は次のとおりです.
1.4 webhookを変更し、volcanoがラベルを含むリクエストをブロックできるようにする
1.1新規コマンドライン
vcctlにコマンドラインを追加
rootCmd.AddCommand(buildSpark())
rootCmd.AddCommand(buildSparkOperator())
1.2コマンドラインルートおよび命令の作成
func buildSparkOperator() *cobra.Command {
//
sparkOperatorCmd := &cobra.Command{
Use: "spark-operator",
Short: "spark-operator cmd",
}
sparkSubmitCmd := &cobra.Command{
Use: "spark-operator",
Short: "spark operator",
Run: func(cmd *cobra.Command, args []string) {
checkError(cmd, spark_operator.RunSparkOperatorSubmit())
},
}
// flag
spark_operator.InitSparkOperatorFlags(sparkSubmitCmd)
sparkOperatorCmd.AddCommand(sparkSubmitCmd)
return sparkOperatorCmd
}
1.3 yamlファイルを構築し、ローカルjarパッケージをファイルサーバにコミットする
func RunSparkOperatorSubmit() error {
//
// /opt/spark-examples_2.11-2.4.4.jar
filePathSplit := strings.Split(cf.FilePath, "/")
cf.FileName = filePathSplit[len(filePathSplit)-1]
//
sf.Spec.MainApplicationFile = "local:///opt/spark/examples/target/scala-2.11/jars/" + cf.FileName
sf.Spec.Volumes.Volume = []Volume{{Name: cf.VolumeName, HostPath: HostPath{cf.HostPath, cf.HostPathType}}}
//
sf.Spec.Driver.Labels = map[string]string{cf.DriverLabel: cf.DriverLabelValue, "odin.k8s.io/spark": "true", "odin.io/filename": cf.FileName, "odin.registry/addr": "10.180.210.196"}
sf.Spec.Driver.VolumeMounts.VolumeMount = []VolumeMount{{Name: cf.DriverVolumeMountName, MountPath: cf.DriverVolumeMountPath}}
sf.Spec.Executor.Labels = map[string]string{cf.ExecutorLabel: cf.ExecutorLabelValue, "odin.k8s.io/spark": "true", "odin.io/filename": cf.FileName, "odin.registry/addr": "10.180.210.196"}
sf.Spec.Executor.VolumeMounts.VolumeMount = []VolumeMount{{Name: cf.ExecutorVolumeMountName, MountPath: cf.ExecutorVolumeMountPath}}
// yaml
fs, err := yaml.Marshal(&sf)
if err != nil {
println(err.Error())
}
// yaml
f, err := os.Create(sf.Metadata.Name + ".yaml")
if err != nil {
fmt.Println(err)
}
// api
rmVolume := regexp.MustCompile("volume:
")
rmVolumeMount := regexp.MustCompile("volumeMount:
")
yamlString := string(fs)
yamlString = rmVolume.ReplaceAllString(yamlString, "")
yamlString = rmVolumeMount.ReplaceAllString(yamlString, "")
//
_, err = f.WriteString(yamlString)
if err != nil {
fmt.Println(err)
f.Close()
}
// jar
uploadFile(cf.FilePath, "http://10.180.210.37:33332/upload")
//
cmd := exec.Command("/bin/bash", "-c", "kubectl apply -f "+f.Name())
output, err := cmd.Output()
if err != nil {
return err
}
fmt.Printf("Execute Shell:%s finished with output:
%s", cmd, string(output))
return err
}
yamlファイル構造体の定義は次のとおりです.
type sparkOperatorFlags struct {
ApiVersion string `yaml:"apiVersion"`
Kind string `yaml:"kind"`
Metadata struct {
Name string `yaml:"name"`
Namespace string `yaml:"namespace"`
}
Spec struct {
Types string `yaml:"type"`
Mode string `yaml:"mode"`
Image string `yaml:"image"`
ImagePullPolicy string `yaml:"imagePullPolicy"`
MainClass string `yaml:"mainClass"`
MainApplicationFile string `yaml:"mainApplicationFile"`
SparkVersion string `yaml:"sparkVersion"`
BatchScheduler string `yaml:"batchScheduler"`
RestartPolicy struct {
Types string `yaml:"type"`
}
Volumes struct {
Volume []Volume `yaml:"volume"`
}
Driver struct {
Cores int `yaml:"cores"`
CoreLimit string `yaml:"coreLimit"`
Memory string `yaml:"memory"`
Labels map[string]string `yaml:"labels"`
ServiceAccount string `yaml:"serviceAccount"`
VolumeMounts struct {
VolumeMount []VolumeMount `yaml:"volumeMount"`
}
}
Executor struct {
Cores int `yaml:"cores"`
Instances int `yaml:"instances"`
Memory string `yaml:"memory"`
Labels map[string]string `yaml:"labels"`
VolumeMounts struct {
VolumeMount []VolumeMount `yaml:"volumeMount"`
}
}
}
}
type VolumeMount struct {
Name string `yaml:"name"`
MountPath string `yaml:"mountPath"`
}
type Volume struct {
Name string `yaml:"name"`
HostPath HostPath `yaml:"hostPath"`
}
type HostPath struct {
Path string `yaml:"path"`
Types string `yaml:"type"`
}
1.4 webhookを変更し、volcanoがラベルを含むリクエストをブロックできるようにする
const (
// DefaultQueue constant stores the name of the queue as "default"
DefaultQueue = "default"
defaultSchedulerName = "volcano"
INIT_CONTAINER_NAME = "spark-init"
ODIN_FILE_SERVER_ADDR = "10.180.210.37"//"odin-file-server"
ODIN_FILE_SERVER_PORT = 80
ODIN_FILE_DOWNLOAD_KEY = "odin.io/filename"
ODIN_IMAGE_REGISTRY_ADDR_KEY = "odin.registry/addr"
ODIN_CONFIRM_SPARK_APP_KEY = "odin.k8s.io/spark"
ODIN_APP_EXEC_PATH="/opt/spark/examples/target/scala-2.11/jars/"
ODIN_BASE_IMAGE="library/centos-ssh:latest"
)
func init() {
router.RegisterAdmission(service)
}
// MutatingWebhookConfiguration
var service = &router.AdmissionService{
Path: "/pods/mutate",
//
Func: MutatePods,
//
MutatingConfig: &whv1beta1.MutatingWebhookConfiguration{
Webhooks: []whv1beta1.MutatingWebhook{{
Name: "mutatepod.volcano.sh",
Rules: []whv1beta1.RuleWithOperations{
{
Operations: []whv1beta1.OperationType{whv1beta1.Create},
Rule: whv1beta1.Rule{
APIGroups: []string{""},
APIVersions: []string{"v1"},
Resources: []string{"pods"},
},
},
},
}},
},
}