SparkStreaming、kafka、mysql統合

2619 ワード

前の段はプロジェクトに接触しました。需要はmonoの増分データです。kafkaの生産者として、sparkStreamingを消費者として、最終的に処理してからmysqlに書き込みます。以前はkafkaとsparkStreamingについて知っていましたので、私が一番よく知っているjavaを使ってプロジェクトを完成しました。
部分コードは以下の通りです。
public class JsonStream {


    public static void main(String[] args) throws Exception {

        //   kafka     
        String brokers = "localhost:9092";
        String groupId = "1";
        String topics = "test";

        // Create context with a 30 seconds batch interval

        SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName(
                "GetDataStream");
        JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(30));

        Set topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
        Map kafkaParams = new HashMap<>();

        //brokers    
        kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
        //groupId
        kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

        //          
        kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        //latest, earliest, none          
        kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        // Create direct kafka stream with brokers and topics
        JavaInputDStream> datas = KafkaUtils.createDirectStream(
                jssc,
                LocationStrategies.PreferConsistent(),
                ConsumerStrategies.Subscribe(topicsSet, kafkaParams));

        //                        
        jssc.checkpoint("/Users/lihongji/Desktop/check");

        JavaDStream lines = datas.map(ConsumerRecord::value);
        lines.map(m -> {
            String jsonstr = m;
            JSONObject jsonObject = JSON.parseObject(m);
            

            //              
            List> dataset;

            List warnList = new ArrayList<>();

            dataset = DBUtil.query("select DISTINCT keywords FROM alertset");


            //      ,    ,      ,     ,          。
            getInfo(jsonObject);


            return m;

        }).print();

        jssc.start();
        jssc.awaitTermination();

    }
プロジェクトで発生した主な問題は、kafkaが送信したjsonデータを解析処理し、常にマッチングした後にmysqlに挿入し、効率を向上させるために、挿入時にjdbc batchを選択して効率を向上させることです。