SparkStreaming、kafka、mysql統合
2619 ワード
前の段はプロジェクトに接触しました。需要はmonoの増分データです。kafkaの生産者として、sparkStreamingを消費者として、最終的に処理してからmysqlに書き込みます。以前はkafkaとsparkStreamingについて知っていましたので、私が一番よく知っているjavaを使ってプロジェクトを完成しました。
部分コードは以下の通りです。
部分コードは以下の通りです。
public class JsonStream {
public static void main(String[] args) throws Exception {
// kafka
String brokers = "localhost:9092";
String groupId = "1";
String topics = "test";
// Create context with a 30 seconds batch interval
SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName(
"GetDataStream");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(30));
Set topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
Map kafkaParams = new HashMap<>();
//brokers
kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
//groupId
kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
//
kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
//latest, earliest, none
kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Create direct kafka stream with brokers and topics
JavaInputDStream> datas = KafkaUtils.createDirectStream(
jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(topicsSet, kafkaParams));
//
jssc.checkpoint("/Users/lihongji/Desktop/check");
JavaDStream lines = datas.map(ConsumerRecord::value);
lines.map(m -> {
String jsonstr = m;
JSONObject jsonObject = JSON.parseObject(m);
//
List
プロジェクトで発生した主な問題は、kafkaが送信したjsonデータを解析処理し、常にマッチングした後にmysqlに挿入し、効率を向上させるために、挿入時にjdbc batchを選択して効率を向上させることです。