EMR+Lambdaでバッチ処理を自動化する(Lambdaアプリの作成)


LambdaからEMRを起動するための クソ コードになります。

記述言語

  • java8

Lambdaが呼び出す場所

EMR+Lambdaでバッチ処理を自動化する(Lambdaの準備編)でLambdaが実行するコード指定に

パッケージ.クラス::メソッド

# 今回は, hoge.Hoge::run のようになります

があったんですが、実際の中身になります。

public class Hoge {

    /**
     * 実行
     * @param data
     * @param context
     * @return
     */
    public String run(final Map<String, String> data, final Context context) {

        //EMR起動(後述)

        "complete!"
    }

第1引数がMap<String, String>なのは、Jsonが来る想定になっています。Jsonがネストする場合、Map<String, Object>が正解かも。

Stringで受けてしまうとエラーが起こるっぽいので注意

EMRの起動について

EMRを使ったことがある方は既にお分かりの方が多いと思いますが、EMRを設定するためには

  • EMRが利用するソフトウェア(Hadoop, Sparkなど)の設定
  • EMRが立ち上げるEC2インスタンスの設定
  • それぞれのセキュリティの設定
  • EMRで何をするかのStepの設定

の4つ程度に分かれるとおもいます。
それぞれどのように設定していくか、紹介します。

EMRが利用するソフトウェア(Hadoop, Sparkなど)の設定

RunJobFlowRequest job = new RunJobFlowRequest();
job.withName("EMRの名前");

// 現在私は、emr-5.0.0を使いたいので "emr-5.0.0" と、入れています。
job.withReleaseLabel("EMRのバージョン");

job.withLogUri("ログを排出するS3のパス s3://~~~");
job.withServiceRole("EMRに設定するIamRole");
job.withJobFlowRole("EMRが起動するEC2に設定するIamRole");
job.withVisibleToAllUsers(true);
job.withInstances("EMRが立ち上げるEC2インスタンスの設定(後述)");
job.withApplications("EMRに入れるソフトウェア");
job.withConfigurations("EMRに入れるソフトウェアの設定");
    }

  • EMRに入れるソフトウェア

    私の場合は、Hadoop + Spark + Gangliaを入れたかったので以下のようにしています。

    private static List<Application> buildApplications() {
        List<Application> apps = new ArrayList<>();
        apps.add(new Application().withName("Hadoop"));
        apps.add(new Application().withName("Spark"));
        apps.add(new Application().withName("Ganglia"));
        return apps;
    }
    
  • EMRに入れるソフトウェアの設定

    Sparkのおまじないとして、以下を入れています。

    [{"classification":"spark","properties":{"maximizeResourceAllocation":"true"}}]
    

    java上では以下のように記述します。

    final Configuration configuration = new Configuration();
    configuration.setClassification("spark");
    
    final HashMap<String, String> map = new HashMap<>();
    map.put("maximizeResourceAllocation", "true");
    configuration.setProperties(map);
    
    return configuration;
    
    

EMRが立ち上げるEC2インスタンスの設定、およびセキュリティ

final JobFlowInstancesConfig instanceConfig = new JobFlowInstancesConfig();
instanceConfig.withEc2KeyName("key pair名");
instanceConfig.withEc2SubnetId("EC2を立ち上げるサブネット名");
instanceConfig.withEmrManagedMasterSecurityGroup("マスターのSecurityGroup");
instanceConfig.withEmrManagedSlaveSecurityGroup("コア、タスクのSecurityGroup");
instanceConfig.withAdditionalMasterSecurityGroups("マスターへの追加のセキュリティグループ");
instanceConfig.withAdditionalSlaveSecurityGroups("コア、タスクの追加のセキュリティグループ");
instanceConfig.withKeepJobFlowAliveWhenNoSteps("Jobがない時、EMRを落とすかどうか 落とすならfalse");
instanceConfig.withInstanceGroups("インスタンス自体の設定(後述)");
instanceConfig.withTerminationProtected("インスタンスの保護");
  • インスタンス自体の設定に関して

    final List<InstanceGroupConfig> result = new ArrayList<>();
    InstanceGroupConfig masterInstance = new InstanceGroupConfig()
                .withName("マスターノード")
                .withInstanceRole(InstanceRoleType.MASTER)
                .withInstanceCount(1)
                .withInstanceType("インスタンスタイプ") //インスタンスタイプ
                .withMarket(MarketType.SPOT)
                .withBidPrice("0.14");
        result.add(masterInstance);
    
    InstanceGroupConfig coreInsetance = new InstanceGroupConfig()
                .withName("コアノード")
                .withInstanceRole(InstanceRoleType.CORE)
                .withInstanceCount(1)
                .withInstanceType("インスタンスタイプ")
                .withMarket(MarketType.SPOT)
                .withBidPrice("0.14");
        result.add(coreInsetance);
    
        InstanceGroupConfig taskInsetance = new InstanceGroupConfig()
                .withName("タスクノード")
                .withInstanceRole(InstanceRoleType.TASK)
                .withInstanceCount(4)
                .withInstanceType("インスタンスタイプ")
                .withMarket(MarketType.SPOT)
                .withBidPrice("0.14");
        result.add(taskInsetance);
        return result;
    

EMRで何をするかのStepの設定

私の場合は、Sparkアプリケーションで分散集計処理をするので
以下のようなコマンドをStepとして実行させます。

spark-submit --deploy-mode cluster --class my.Job s3://spark-app.jar 引数1 引数2 ...

これをJavaのコードにする場合は、以下となります。

final String[] args = {
                    "spark-submit",
                    "--deploy-mode", "cluster",
                    "--class", "my.Job",
                    "s3://spark-app.jar", 
                    "引数1", "引数2"
            };

final StepConfig stepConfig = new StepConfig()
                    .withName("step名")
                    .withActionOnFailure("終了時の振る舞い")
                    .withHadoopJarStep(new HadoopJarStepConfig()
                            .withJar("command-runner.jar")
                            .withArgs(args));

return stepConfig;

EMR起動

先ほど作ったEMRの設定をするインスタンスに,Stepを追加します

// RunJobFlowRequest job = new RunJobFlowRequest();
// ・・・

job.withSteps(steps);

以下のコードでEMRが起動します

final AmazonElasticMapReduceClient emr = new AmazonElasticMapReduceClient();
emr.setRegion(Region.getRegion(Regions.AP_NORTHEAST_1));

emr.runJobFlow(job);