Flink Demoテスト------Kafka接続(Flink 1.8;Hadoop 3.0)


pomファイル
 Flink Quickstart Job
    http://www.myorganization.org

    
        
            apache.snapshots
            Apache Development Snapshot Repository
            https://repository.apache.org/content/repositories/snapshots/
            
                false
            
            
                true
            
        
    

    
        UTF-8
        yyyyMMddHHmmss
        2.11.11
        2.11
        3.0.0
        1.8.0
    

    

        
            org.apache.flink
            flink-scala_${scala.binary.version}
            ${flink.version}
            provided
        

        
            org.apache.flink
            flink-streaming-scala_${scala.binary.version}
            ${flink.version}
            provided
        

        
            org.scala-lang
            scala-library
            ${scala.version}
            provided
        

        
            org.apache.flink
            flink-connector-kafka-0.10_${scala.binary.version}
            ${flink.version}
        

        
            org.apache.kafka
            kafka_2.11
            1.0.0
        

        
            mysql
            mysql-connector-java
            5.1.38
        

        
            org.slf4j
            slf4j-log4j12
            1.7.7
            runtime
        

        
            log4j
            log4j
            1.2.17
            runtime
        

    

    
        
            
                org.apache.maven.plugins
                maven-shade-plugin
                3.0.0
                
                    
                    
                        package
                        
                            shade
                        
                        
                            
                                
                                    org.apache.flink:force-shading
                                    com.google.code.findbugs:jsr305
                                    org.slf4j:*
                                    log4j:*
                                
                            
                            
                                
                                    
                                    *:*
                                    
                                        META-INF/*.SF
                                        META-INF/*.DSA
                                        META-INF/*.RSA
                                    
                                
                            
                            
                                
                                    com.zm.kafkaTs.KafkaOtherWriteTS
                                
                            
                        
                    
                
            

            
            
                org.apache.maven.plugins
                maven-compiler-plugin
                3.1
                
                    1.8
                    1.8
                
            

            
            
                net.alchim31.maven
                scala-maven-plugin
                3.2.2
                
                    
                        
                            compile
                            testCompile
                        
                    
                
            


            
                org.codehaus.mojo
                build-helper-maven-plugin
                1.7
                
                    
                    
                        add-source
                        generate-sources
                        
                            add-source
                        
                        
                            
                                src/main/scala
                            
                        
                    
                    
                    
                        add-test-source
                        generate-test-sources
                        
                            add-test-source
                        
                        
                            
                                src/test/scala
                            
                        
                    
                
            
        
    

    
        
            add-dependencies-for-IDEA

            
                
                    idea.version
                
            

            
                
                    org.apache.flink
                    flink-scala_${scala.binary.version}
                    ${flink.version}
                    compile
                
                
                    org.apache.flink
                    flink-streaming-scala_${scala.binary.version}
                    ${flink.version}
                    compile
                
                
                    org.scala-lang
                    scala-library
                    ${scala.version}
                    compile
                
            
        
    

コード#コード#


import java.util.Properties

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.apache.flink.streaming.util.serialization.SimpleStringSchema

object KafkaOtherWriteTS {

  def main(args: Array[String]): Unit = {
//kafka 
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.enableCheckpointing(5000)
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

    val kafkaProps: Properties = new Properties()
    kafkaProps.setProperty("bootstrap.servers","***:9092")
    kafkaProps.setProperty("group.id",args(0))
    kafkaProps.setProperty("auto.offset.reset","earliest")

    val consumer = new FlinkKafkaConsumer010[String]("****",new SimpleStringSchema,kafkaProps)

    val transaction: DataStream[String] = env.addSource(consumer)

    transaction.print()
    env.execute(this.getClass.getSimpleName)

  }

}