Flink実現WordCount

3600 ワード


import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class WordCount {
    public static void main(String[] args) throws Exception {

        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSet text = env.fromElements("To be, or not to be,--that is the question:--",
                                                "Whether 'tis nobler in the mind to suffer",
                                                "The slings and arrows of outrageous fortune",
                                                "Or to take arms against a sea of troubles,");
        DataSet> counts = text.flatMap(new FlatMapFunction>() {
            @Override
            public void flatMap(String s, Collector> collector) throws Exception {
                String[] tokens = s.toLowerCase().split("\\s+");
                for (String token : tokens) {
                    if (token.length() > 0) {
                        collector.collect(new Tuple2(token, 1));
                    }
                }
            }
        }).groupBy(0).sum(1);
        counts.print();


    }
}