import jdk.nashorn.internal.codegen.CompilerConstants;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import sun.util.resources.cldr.ar.CalendarData_ar_SD;
import java.util.HashMap;
import java.util.Properties;
/**
* kafkaSource
*
* offset kafka
*/
public class StreamingKafkaSource {
public static void main(String[] args) throws Exception {
// Flink
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//checkpoint
env.enableCheckpointing(5000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// statebackend
//env.setStateBackend(new RocksDBStateBackend("hdfs://hadoop100:9000/flink/checkpoints",true));
// kafka
String topic = "test";
Properties prop = new Properties();
prop.setProperty("bootstrap.servers","192.168.200.10:9092");
prop.setProperty("group.id","con1");
// kafka offset
HashMap kafkaTopicPartitionMap = new HashMap<>();
kafkaTopicPartitionMap.put(new KafkaTopicPartition(topic,0),10L);
kafkaTopicPartitionMap.put(new KafkaTopicPartition(topic,1),0L);
kafkaTopicPartitionMap.put(new KafkaTopicPartition(topic,2),0L);
FlinkKafkaConsumer011 myConsumer = new FlinkKafkaConsumer011<>(topic, new SimpleStringSchema(), prop);
//
//myConsumer.setStartFromGroupOffsets(); // kafka
myConsumer.setStartFromSpecificOffsets(kafkaTopicPartitionMap); // offset
DataStreamSource text = env.addSource(myConsumer);
/* text.map(new MapFunction() {
@Override
public Object map(String value) throws Exception {
System.out.println(" :" + value);
System.out.println();
Thread.sleep(10000000); //
return value;
}
});*/
text.print().setParallelism(1);
env.execute("StreamingFromCollection");
}
}