FlinkSQLテスト文

17974 ワード

kafka112Elastic

create table kafka11SourceTable(
uid bigint,
service_name varchar,
msg_type double,
sessionid varchar
)with(
type='kafka11',
bootstrapServers='c1-dsj-kafka102.bj:9092,c1-dsj-kafka103.bj:9092,c1-dsj-kafka104.bj:9092,c1-dsj-kafka105.bj:9092,c1-dsj-kafka106.bj:9092,c1-dsj-kafka107.bj:9092,c1-dsj-kafka108.bj:9092',
offsetReset='latest',
topic='soulmatch.user.messagelogic',
parallelism='4',
topicIsPattern ='false'
);

create table elasticSinkTable(
uid bigint,
service_name varchar,
msg_type double,
sessionid varchar
)with(
type ='elasticsearch',
address ='es.data02.dsj.inkept.cn:9300',
cluster='es',
estype ='ymd',
index ='sql_es_test',
id='0',
parallelism ='4'
);


insert into elasticSinkTable select uid ,service_name,msg_type,sessionid from kafka11SourceTable ;


kafka112Hbase

create table kafka11SourceTable(
uid bigint,
service_name varchar,
msg_type double,
sessionid varchar
)with(
type='kafka11',
bootstrapServers='c1-dsj-kafka102.bj:9092,c1-dsj-kafka103.bj:9092,c1-dsj-kafka104.bj:9092,c1-dsj-kafka105.bj:9092,c1-dsj-kafka106.bj:9092,c1-dsj-kafka107.bj:9092,c1-dsj-kafka108.bj:9092',
offsetReset='latest',
topic='soulmatch.user.messagelogic',
parallelism='4',
topicIsPattern ='false'
);

create table hbaseSinkTable(
cf:uid bigint,
cf:service_name varchar,
cf:msg_type double,
cf:sessionid varchar
)with(
type='hbase',
zookeeperQuorum='zk01.cdh6.dsj.inkept.cn:2181,zk02.cdh6.dsj.inkept.cn:2181,zk03.cdh6.dsj.inkept.cn:2181',
zookeeperParent='/hbase',
tableName='test_flink',
rowKey = 'uid'
);

insert into hbaseSinkTable select uid ,service_name,msg_type,sessionid from kafka11SourceTable ;


kafka11ToKafka11

create table kafka11SourceTable(
uid bigint,
service_name varchar,
msg_type double,
sessionid varchar
)with(
type='kafka11',
bootstrapServers='c1-dsj-kafka102.bj:9092,c1-dsj-kafka103.bj:9092,c1-dsj-kafka104.bj:9092,c1-dsj-kafka105.bj:9092,c1-dsj-kafka106.bj:9092,c1-dsj-kafka107.bj:9092,c1-dsj-kafka108.bj:9092',
offsetReset='latest',
topic='soulmatch.user.messagelogic',
parallelism='4',
topicIsPattern ='false'
);

create table kafka11SinkTable(
uid bigint,
service_name varchar,
msg_type double,
sessionid varchar
)with(
type='kafka11',
bootstrapServers='c1-dsj-kafka102.bj:9092,c1-dsj-kafka103.bj:9092,c1-dsj-kafka104.bj:9092,c1-dsj-kafka105.bj:9092,c1-dsj-kafka106.bj:9092,c1-dsj-kafka107.bj:9092,c1-dsj-kafka108.bj:9092',
topic='test_wangy3',
parallelism='4'
);

insert into kafka11SinkTable select uid ,service_name,msg_type,sessionid from kafka11SourceTable ;

kafka11ToMysql

create table kafka11SourceTable(
uid varchar,
service_name varchar,
msg_type varchar,
sessionid varchar
)with(
type='kafka11',
bootstrapServers='c1-dsj-kafka102.bj:9092,c1-dsj-kafka103.bj:9092,c1-dsj-kafka104.bj:9092,c1-dsj-kafka105.bj:9092,c1-dsj-kafka106.bj:9092,c1-dsj-kafka107.bj:9092,c1-dsj-kafka108.bj:9092',
offsetReset='latest',
topic='soulmatch.user.messagelogic',
parallelism='4',
topicIsPattern ='false'
);

create table mysqlSinkTable(
uid varchar,
service_name varchar,
msg_type varchar,
sessionid varchar
)with(
type='mysql',
url='jdbc:mysql://rm-2zezlm9sjrx104p6e.mysql.rds.aliyuncs.com:3306/sentosa_b',
userName='sentosa_w',
password='1assdaA2xxW3',
tableName='mysqlSinkTable',
parallelism='1'
);

kafka11ToRedis

create table kafka11SourceTable(
uid varchar,
service_name varchar,
msg_type varchar,
sessionid varchar
)with(
type='kafka11',
bootstrapServers='c1-dsj-kafka102.bj:9092,c1-dsj-kafka103.bj:9092,c1-dsj-kafka104.bj:9092,c1-dsj-kafka105.bj:9092,c1-dsj-kafka106.bj:9092,c1-dsj-kafka107.bj:9092,c1-dsj-kafka108.bj:9092',
offsetReset='latest',
topic='soulmatch.user.messagelogic',
parallelism='4',
topicIsPattern ='false'
);

create table mysqlSinkTable(
uid varchar,
service_name varchar,
msg_type varchar,
sessionid varchar,
PRIMARY KEY(uid)
)with(
type='redis',
url='10.111.151.54:6381',
password='data_usp:InkePasswd',
database='1',
redisType='1',
tableName='sinktoredis'
);

insert into mysqlSinkTable select uid ,service_name,msg_type,sessionid from kafka11SourceTable ;