Flink Demoテスト------Kafka接続(Flink 1.8;Hadoop 3.0)
9660 ワード
pomファイル
コード#コード#
Flink Quickstart Job
http://www.myorganization.org
apache.snapshots
Apache Development Snapshot Repository
https://repository.apache.org/content/repositories/snapshots/
false
true
UTF-8
yyyyMMddHHmmss
2.11.11
2.11
3.0.0
1.8.0
org.apache.flink
flink-scala_${scala.binary.version}
${flink.version}
provided
org.apache.flink
flink-streaming-scala_${scala.binary.version}
${flink.version}
provided
org.scala-lang
scala-library
${scala.version}
provided
org.apache.flink
flink-connector-kafka-0.10_${scala.binary.version}
${flink.version}
org.apache.kafka
kafka_2.11
1.0.0
mysql
mysql-connector-java
5.1.38
org.slf4j
slf4j-log4j12
1.7.7
runtime
log4j
log4j
1.2.17
runtime
org.apache.maven.plugins
maven-shade-plugin
3.0.0
package
shade
org.apache.flink:force-shading
com.google.code.findbugs:jsr305
org.slf4j:*
log4j:*
*:*
META-INF/*.SF
META-INF/*.DSA
META-INF/*.RSA
com.zm.kafkaTs.KafkaOtherWriteTS
org.apache.maven.plugins
maven-compiler-plugin
3.1
1.8
1.8
net.alchim31.maven
scala-maven-plugin
3.2.2
compile
testCompile
org.codehaus.mojo
build-helper-maven-plugin
1.7
add-source
generate-sources
add-source
src/main/scala
add-test-source
generate-test-sources
add-test-source
src/test/scala
add-dependencies-for-IDEA
idea.version
org.apache.flink
flink-scala_${scala.binary.version}
${flink.version}
compile
org.apache.flink
flink-streaming-scala_${scala.binary.version}
${flink.version}
compile
org.scala-lang
scala-library
${scala.version}
compile
コード#コード#
import java.util.Properties
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
object KafkaOtherWriteTS {
def main(args: Array[String]): Unit = {
//kafka
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.enableCheckpointing(5000)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
val kafkaProps: Properties = new Properties()
kafkaProps.setProperty("bootstrap.servers","***:9092")
kafkaProps.setProperty("group.id",args(0))
kafkaProps.setProperty("auto.offset.reset","earliest")
val consumer = new FlinkKafkaConsumer010[String]("****",new SimpleStringSchema,kafkaProps)
val transaction: DataStream[String] = env.addSource(consumer)
transaction.print()
env.execute(this.getClass.getSimpleName)
}
}