EMR+Lambdaでバッチ処理を自動化する(Lambdaアプリの作成)
LambdaからEMRを起動するための クソ コードになります。
記述言語
- java8
Lambdaが呼び出す場所
EMR+Lambdaでバッチ処理を自動化する(Lambdaの準備編)でLambdaが実行するコード指定に
パッケージ.クラス::メソッド
# 今回は, hoge.Hoge::run のようになります
があったんですが、実際の中身になります。
public class Hoge {
/**
* 実行
* @param data
* @param context
* @return
*/
public String run(final Map<String, String> data, final Context context) {
//EMR起動(後述)
"complete!"
}
第1引数がMap<String, String>
なのは、Jsonが来る想定になっています。Jsonがネストする場合、Map<String, Object>
が正解かも。
String
で受けてしまうとエラーが起こるっぽいので注意
EMRの起動について
EMRを使ったことがある方は既にお分かりの方が多いと思いますが、EMRを設定するためには
- EMRが利用するソフトウェア(Hadoop, Sparkなど)の設定
- EMRが立ち上げるEC2インスタンスの設定
- それぞれのセキュリティの設定
- EMRで何をするかのStepの設定
の4つ程度に分かれるとおもいます。
それぞれどのように設定していくか、紹介します。
EMRが利用するソフトウェア(Hadoop, Sparkなど)の設定
RunJobFlowRequest job = new RunJobFlowRequest();
job.withName("EMRの名前");
// 現在私は、emr-5.0.0を使いたいので "emr-5.0.0" と、入れています。
job.withReleaseLabel("EMRのバージョン");
job.withLogUri("ログを排出するS3のパス s3://~~~");
job.withServiceRole("EMRに設定するIamRole");
job.withJobFlowRole("EMRが起動するEC2に設定するIamRole");
job.withVisibleToAllUsers(true);
job.withInstances("EMRが立ち上げるEC2インスタンスの設定(後述)");
job.withApplications("EMRに入れるソフトウェア");
job.withConfigurations("EMRに入れるソフトウェアの設定");
}
-
EMRに入れるソフトウェア
私の場合は、Hadoop + Spark + Gangliaを入れたかったので以下のようにしています。
private static List<Application> buildApplications() { List<Application> apps = new ArrayList<>(); apps.add(new Application().withName("Hadoop")); apps.add(new Application().withName("Spark")); apps.add(new Application().withName("Ganglia")); return apps; }
-
EMRに入れるソフトウェアの設定
Sparkのおまじないとして、以下を入れています。
[{"classification":"spark","properties":{"maximizeResourceAllocation":"true"}}]
java上では以下のように記述します。
final Configuration configuration = new Configuration(); configuration.setClassification("spark"); final HashMap<String, String> map = new HashMap<>(); map.put("maximizeResourceAllocation", "true"); configuration.setProperties(map); return configuration;
EMRが立ち上げるEC2インスタンスの設定、およびセキュリティ
final JobFlowInstancesConfig instanceConfig = new JobFlowInstancesConfig();
instanceConfig.withEc2KeyName("key pair名");
instanceConfig.withEc2SubnetId("EC2を立ち上げるサブネット名");
instanceConfig.withEmrManagedMasterSecurityGroup("マスターのSecurityGroup");
instanceConfig.withEmrManagedSlaveSecurityGroup("コア、タスクのSecurityGroup");
instanceConfig.withAdditionalMasterSecurityGroups("マスターへの追加のセキュリティグループ");
instanceConfig.withAdditionalSlaveSecurityGroups("コア、タスクの追加のセキュリティグループ");
instanceConfig.withKeepJobFlowAliveWhenNoSteps("Jobがない時、EMRを落とすかどうか 落とすならfalse");
instanceConfig.withInstanceGroups("インスタンス自体の設定(後述)");
instanceConfig.withTerminationProtected("インスタンスの保護");
-
インスタンス自体の設定に関して
final List<InstanceGroupConfig> result = new ArrayList<>(); InstanceGroupConfig masterInstance = new InstanceGroupConfig() .withName("マスターノード") .withInstanceRole(InstanceRoleType.MASTER) .withInstanceCount(1) .withInstanceType("インスタンスタイプ") //インスタンスタイプ .withMarket(MarketType.SPOT) .withBidPrice("0.14"); result.add(masterInstance); InstanceGroupConfig coreInsetance = new InstanceGroupConfig() .withName("コアノード") .withInstanceRole(InstanceRoleType.CORE) .withInstanceCount(1) .withInstanceType("インスタンスタイプ") .withMarket(MarketType.SPOT) .withBidPrice("0.14"); result.add(coreInsetance); InstanceGroupConfig taskInsetance = new InstanceGroupConfig() .withName("タスクノード") .withInstanceRole(InstanceRoleType.TASK) .withInstanceCount(4) .withInstanceType("インスタンスタイプ") .withMarket(MarketType.SPOT) .withBidPrice("0.14"); result.add(taskInsetance); return result;
EMRで何をするかのStepの設定
私の場合は、Sparkアプリケーションで分散集計処理をするので
以下のようなコマンドをStepとして実行させます。
spark-submit --deploy-mode cluster --class my.Job s3://spark-app.jar 引数1 引数2 ...
これをJavaのコードにする場合は、以下となります。
final String[] args = {
"spark-submit",
"--deploy-mode", "cluster",
"--class", "my.Job",
"s3://spark-app.jar",
"引数1", "引数2"
};
final StepConfig stepConfig = new StepConfig()
.withName("step名")
.withActionOnFailure("終了時の振る舞い")
.withHadoopJarStep(new HadoopJarStepConfig()
.withJar("command-runner.jar")
.withArgs(args));
return stepConfig;
EMR起動
先ほど作ったEMRの設定をするインスタンスに,Stepを追加します
// RunJobFlowRequest job = new RunJobFlowRequest();
// ・・・
job.withSteps(steps);
以下のコードでEMRが起動します
final AmazonElasticMapReduceClient emr = new AmazonElasticMapReduceClient();
emr.setRegion(Region.getRegion(Regions.AP_NORTHEAST_1));
emr.runJobFlow(job);
Author And Source
この問題について(EMR+Lambdaでバッチ処理を自動化する(Lambdaアプリの作成)), 我々は、より多くの情報をここで見つけました https://qiita.com/RyujiKawazoe/items/534f4b069ebea2f7d26c著者帰属:元の著者の情報は、元のURLに含まれています。著作権は原作者に属する。
Content is automatically searched and collected through network algorithms . If there is a violation . Please contact us . We will adjust (correct author information ,or delete content ) as soon as possible .