flink DataSet接続mongosクラスタmongo-hadoopを使用
3462 ワード
flinkの公式例は簡単で、久しぶりに更新されました.
mongos本番クラスタには認証権限が必要です.ドキュメントのヒントに従ってルールを構成します.ここでoutputはテストライブラリに届いているので、あまり構成されていません.
ここでは単一スレッド読み出しであり,構成による並列読み出しが可能かどうかは不明である.
mongo-hadoopドキュメントmongo-hadoop:Authentication
mongos本番クラスタには認証権限が必要です.ドキュメントのヒントに従ってルールを構成します.ここでoutputはテストライブラリに届いているので、あまり構成されていません.
ここでは単一スレッド読み出しであり,構成による並列読み出しが可能かどうかは不明である.
import com.mongodb.hadoop.MongoInputFormat;
import com.mongodb.hadoop.MongoOutputFormat;
import com.mongodb.hadoop.io.BSONWritable;
import example.flink.KeySelector.RecordSeclectId;
import example.flink.mapFunction.BSONMapToRecord;
import example.flink.reduceFunction.KeyedGroupReduce;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat;
import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.hadoop.mapreduce.Job;
import org.bson.BSONObject;
public class MongoSet {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
Job inputJob = Job.getInstance();
//inputJob.getConfiguration().set("mongo.input.uri", "mongodb://readuser:readpw@mongos01:port,mongos02:port,mongos03:port/db.collection");
//inputJob.getConfiguration().set("mongo.auth.uri", "mongodb://root:rootpw@mongos01:port,mongos02:port,mongos03:port/admin");
inputJob.getConfiguration().set("mongo.input.uri", "mongodb://readuser:readpw@mongos01:port,mongos02:port,mongos03:port/db.collection?&authMechanism=SCRAM-SHA-1&authSource=admin&readPreference=secondary");
inputJob.getConfiguration().set("mongo.input.split.read_shard_chunks", "true");
inputJob.getConfiguration().set("mongo.input.split.create_input_splits", "false");
inputJob.getConfiguration().set("mongo.input.split_size","16");
inputJob.getConfiguration().set("mongo.input.query", "{'createDateTime': {\"$lte\":{\"$date\":\"2019-05-27T00:00:00.000Z\"}, \"$gte\":{\"$date\":\"2010-03-17T00:00:00.000Z\"}}}");
inputJob.getConfiguration().set("mongo.input.fields", "{\"Id\":\"1\",\"saleType\":\"1\",\"saleNum\":\"1\",\"createDateTime\":\"1\"}");
HadoopInputFormat
mongo-hadoopドキュメントmongo-hadoop:Authentication