flink掃盲-DataStreamにおけるデータソースAPI実験

22677 ワード

文書ディレクトリ

  • 直接入力形式
  • fromElements
  • fromCollection
  • Socket形式
  • ファイル形式
  • カスタム形式
  • 次に,DataStreamにおけるデータがAPIに流れるデータソースについて実験を行う.

    ちょくせつにゅうりょくけいしき


    fromElements

    step1: ElementsInput.java
    package org.myorg.quickstart;
    
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    /**
     * @author ryan create on 2019/1/6
     **/
    public class ElementsInput {
        public static void main(String[] args) throws Exception {
            // get the execution environment
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            // string Elements
    
            String inputText1 = "hey, man, this is collection are you ok?";
            String inputText2 = "hello flink, this is string";
    
            DataStreamSource<String> text = env.fromElements(inputText1, inputText2);
    
            // parse the data, group it, window it, and aggregate the counts
            text.print();
    
            /**
             * 2> hello flink, this is string
             * 1> hey, man, this is collection are you ok?
             */
    
            // print the results with a single thread, rather than in parallel
            env.execute();
        }
    }
    
    
    step2:
    mvn clean package
    mvn exec:java -Dexec.mainClass=org.myorg.quickstart.ElementsInput 
    
    

    ページダイレクト印刷
    1> hey, man, this is collection are you ok?
    2> hello flink, this is string
    

    fromCollection

    step1: CollectionInput.java
    package org.myorg.quickstart;
    
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    import java.util.ArrayList;
    import java.util.Arrays;
    
    /**
     * @author ryan 
     **/
    public class CollectionInput {
        public static void main(String[] args) throws Exception {
            // get the execution environment
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    
    
            DataStreamSource<String> text = env.fromCollection(new ArrayList<String>(Arrays.asList("Hi", "legotime", "ok")));
            text.map(new MapFunction<String, Void>() {
                @Override
                public Void map(String s) throws Exception {
                    System.out.println(s);
                    return null;
                }
            });
    
    
            // print the results with a single thread, rather than in parallel
            env.execute();
        }
    }
    
    
    step2:
    mvn clean package
    mvn exec:java -Dexec.mainClass=org.myorg.quickstart.CollectionInput 
    
    

    印刷
    Hi
    legotime
    ok
    

    Socket形式

    step1: SocketWindowWordCount.java
    package org.myorg.quickstart;
    
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.common.functions.ReduceFunction;
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.util.Collector;
    
    /**
     * Implements a streaming windowed version of the "WordCount" program.
     *
     * 

    This program connects to a server socket and reads strings from the socket. * The easiest way to try this out is to open a text server (at port 12345) * using the netcat tool via *

    
     * nc -l 12345
     * 
    * and run this example with the hostname and the port as arguments.
    */
    @SuppressWarnings("serial")
    public class SocketWindowWordCount {
    public static void main(String[] args) throws Exception {
    //the host and the port to connect to
    final String hostname;
    final int port;
    try {
    final ParameterTool params = ParameterTool.fromArgs(args);
    hostname = params.has("hostname") ? params.get("hostname") : "localhost";
    port = params.getInt("port");
    } catch (Exception e) {
    System.err.println("No port specified. Please run 'SocketWindowWordCount "+
    "--hostname --port ', where hostname (localhost by default) "+
    "and port is the address of the text server");
    System.err.println("To start a simple text server, run 'netcat -l ' and "+
    "type the input text into the command line");
    return;
    }
    //get the execution environment
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    //get input data by connecting to the socket
    DataStream text = env.socketTextStream(hostname, port, "");
    //parse the data, group it, window it, and aggregate the counts
    DataStream windowCounts = text
    .flatMap(new FlatMapFunction() {
    @Override
    public void flatMap(String value, Collector out) {
    for (String word : value.split("\\s")) {
    out.collect(new WordWithCount(word, 1L));
    }
    }
    })
    .keyBy("word")
    .timeWindow(Time.seconds(5))
    .reduce(new ReduceFunction() {
    @Override
    public WordWithCount reduce(WordWithCount a, WordWithCount b) {
    return new WordWithCount(a.word, a.count + b.count);
    }
    });
    //print the results with a single thread, rather than in parallel
    windowCounts.print().setParallelism(1);
    env.execute("Socket Window WordCount");
    }
    //------------------------------------------------------------------------
    /**
    * Data type for words with count.
    */
    public static class WordWithCount {
    public String word;
    public long count;
    public WordWithCount() {
    }
    public WordWithCount(String word, long count) {
    this.word = word;
    this.count = count;
    }
    @Override
    public String toString() {
    return word + ": "+ count;
    }
    }
    }step2: socket
    nc -l 12345
    
    step3:
    mvn clean package
    mvn exec:java -Dexec.mainClass=org.myorg.quickstart.SocketWindowWordCount -Dexec.args="--hostname 127.0.0.1  --port 12345"
    
    
    step4:socket
    ➜  tmp nc -l 12345
    this is socket
    pretty test
    

    の が られた
    this : 1
    is : 1
    socket : 1
    pretty : 1
    test : 1
    

    ファイル

    step1: FileInput.java
    package org.myorg.quickstart;
    
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    
    /**
     * @author ryan 
     **/
    public class FileInput {
        public static void main(String[] args) throws Exception {
            // get the execution environment
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStreamSource<String> text = env.readTextFile("tmp.txt");
    
    
            text.map(new MapFunction<String, Void>() {
                @Override
                public Void map(String s) throws Exception {
                    System.out.println(s);
                    return null;
                }
            });
    
    
            // print the results with a single thread, rather than in parallel
            env.execute();
        }
    }
    
    
    step3:
    mvn clean package
    echo "this is tmp file " > tmp.txt
    mvn exec:java -Dexec.mainClass=org.myorg.quickstart.SocketWindowWordCount -Dexec.args="--hostname 127.0.0.1  --port 12345"
    

    :
    this is tmp file 
    

    カスタム


    カスタム は、kafkaなどの のデータソースであってもよく、 のconnector で に されます.