Flank SQL実戦
4777 ワード
最近Flink SQLフローを研究しています。ここで簡単な実戦を書いてください。
背景
1分ごとに過去1時間の商品購入量を集計します。
データフォーマットは「"behavior":"cart"で、「itemid」:19で、「nowTime」:15621437553}です。
behavior:ユーザー行為のために、cart、pv、buyがあります。私達が欲しいのはbuyです。
コード
pojoクラスです。属性はbehavior、itemid、nowTimeです。
UDFToTime
これはカスタムのUDFで、timestampを東八区に転送する時間を担当しています。Flank SQLの時間は北京から8時間の差がありますので、その後必要があれば、変換できます。
1、現在Flinik SQLは流式処理でlimitをサポートしていません。order byに対して、timeだけを並べ替えると言いましたが、試してみました。成功していませんでした。サポートがあまりよくないです。私がよく使っていないかもしれません。
2、時間に対して、どのようにイベントタイムを使うかは、前のdatastreamに先に登録して、そしてテーブルを登録する時、対応する属性の後で .rowTimeです。これはイベントです。processingTimeを使うなら、レジストリの時、属性はそこに、proctime.proctimeを追加してください。この時、proctimeはシステムの現在の時間です。直接に使ってもいいです。
他の知識点については、ホームページをご覧ください。
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sql.html
背景
1分ごとに過去1時間の商品購入量を集計します。
データフォーマットは「"behavior":"cart"で、「itemid」:19で、「nowTime」:15621437553}です。
behavior:ユーザー行為のために、cart、pv、buyがあります。私達が欲しいのはbuyです。
コード
import com.alibaba.fastjson.JSON;
import com.closeli.writetohbase.fun.SQLKEYPro;
import com.closeli.writetohbase.fun.UDFToTime;
import com.closeli.writetohbase.fun.UserWaterMark;
import com.closeli.writetohbase.pojo.UserData;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import java.util.Properties;
public class SQLTOKafka {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
Properties prop = new Properties();
prop.put("group.id","aaaaaaa");
prop.put("bootstrap.servers","node223:6667,node224:6667,node225:6667");
FlinkKafkaConsumer010 consumer = new FlinkKafkaConsumer010<>("mydata", new SimpleStringSchema(), prop);
DataStreamSource dataSource = env.addSource(consumer);
// dataSource.print();
SingleOutputStreamOperator filterData = dataSource.map(new MapFunction() {
@Override
public UserData map(String value) throws Exception {
UserData userData = null;
try {
userData = JSON.parseObject(value, UserData.class);
} catch (Exception e) {
e.printStackTrace();
}
return userData;
}
}).filter(x -> x != null && x.getBehavior().equals("pv")).assignTimestampsAndWatermarks(new UserWaterMark());
// UDF,
tEnv.registerFunction("timetoformat",new UDFToTime());
tEnv.registerDataStream("userTable",filterData,"behavior,itemId,nowTime.rowTime");
// Table table = tEnv.sqlQuery("select url,count(1) from userTable group by TUMBLE(nowTime,INTERVAL '5' SECOND),url");
Table table = tEnv.sqlQuery("select itemId,count(1),timetoformat(HOP_END(nowTime, INTERVAL '10' SECOND, INTERVAL '30' MINUTE)) from userTable group by HOP(nowTime, INTERVAL '10' SECOND, INTERVAL '30' MINUTE),itemId");
// DataStream resultData = tEnv.toAppendStream(table, Types.TUPLE(Types.INT, Types.LONG ,Types.SQL_TIMESTAMP));
// DataStream resultData = tEnv.toAppendStream(table, Types.TUPLE(Types.INT, Types.LONG ,Types.LONG));
DataStream> resultData = tEnv.toAppendStream(table, TypeInformation.of(new TypeHint>() {}));
resultData.print();
env.execute("SQLTOKafka");
}
}
UserDatapojoクラスです。属性はbehavior、itemid、nowTimeです。
UDFToTime
これはカスタムのUDFで、timestampを東八区に転送する時間を担当しています。Flank SQLの時間は北京から8時間の差がありますので、その後必要があれば、変換できます。
import org.apache.flink.table.functions.ScalarFunction;
import java.sql.Timestamp;
public class UDFToTime extends ScalarFunction {
public Long eval(Timestamp s){
return s.getTime() + 28800000;
}
}
注意点1、現在Flinik SQLは流式処理でlimitをサポートしていません。order byに対して、timeだけを並べ替えると言いましたが、試してみました。成功していませんでした。サポートがあまりよくないです。私がよく使っていないかもしれません。
2、時間に対して、どのようにイベントタイムを使うかは、前のdatastreamに先に登録して、そしてテーブルを登録する時、対応する属性の後で .rowTimeです。これはイベントです。processingTimeを使うなら、レジストリの時、属性はそこに、proctime.proctimeを追加してください。この時、proctimeはシステムの現在の時間です。直接に使ってもいいです。
他の知識点については、ホームページをご覧ください。
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sql.html