oracleデータアクセスclickhouse
12984 ワード
oracleデータアクセスclickhouse
oracleデータアクセスには2つの側面があります.1つはCDCで、ログに基づいてデータの変化をキャプチャし、削除変更を含む.二つ目は、インクリメンタルデータの準リアルタイムインポートであり、自己増加idまたは時間フィールドに依存し、CDCに比べて配置が簡単であり、適用シーンもインクリメンタルデータにのみ適用される.ここでは、インクリメンタルデータへのアクセスについてのみ説明します.
一、clickhouse単機取付OpenSSLのアップグレード rpm -Uvh openssl-1.0.2k-12.el7.x86_64.rpm
Unixodbcのインストール rpm -ivh unixODBC-2.3.1-11.el7.x86_64.rpm
clickhouseのインストール http://repo.yandex.ru/clickhouse/rpm/stable/x86_64/ rpm -ivh clickhouse*
二、oracle->flume->kafka
kafka connectのjdbc source接続oracleを使用して、接続に失敗し続けます.しかしmysqlに接続することは可能であり、しばらく問題点は見つからない.flumeインストール (略)hdpにおけるFlume 1.5.2 を使用
kafkaインストール (略)hdpにおけるkafka 0.10.1 を用いる
flume-ng-sql-source ダウンロードアドレス:https://github.com/keedio/flume-ng-sql-source.git,http://repo.red-soft.biz/repos/clickhouse/stable/el7/ コンパイル: clickhouseの書き込みが便利で、デフォルトの区切り記号を','から't';に変更します. 以降、jsonフォーマットの追加が考えられる.
flume-ng-sql-source-1.4.3.jarをflumeのlibディレクトリの下に置く oracle 建表
データ挿入
ojdbc 6.jarをflumeのlibディレクトリの下に置く 新規flume-sql.conf /usr/local/flumeディレクトリにflume-sql.conf を新規作成注意時間フィールドは1番目です. $@$は引用符付きで、start.from値は引用符を必要としません.その値が$@$を入力すると引用符が付きます.
flumeの起動 flume/binディレクトリの下 flume-ng agent –conf conf –conf-file/usr/local/flume/flume-sql.conf –name agentTest -Dflume.root.logger=INFO,console
kafkaにデータが書き込まれているかどうかを確認 kafka-console-consumer.sh –zookeeper localhost:2181 –topic TestTopic データはタブ分割、引用符なし
ステータスファイルの表示 /usr/local/flume/agentTest.sqlSource.status LastIndex値が最後にインポートされた最大時間フィールド 最初から再インポートするには、このファイルを削除する必要があります
三、oracle->flume->clickhouse
flumeのsourceおよびchannelはアクセスkafkaと同様でsinkを変更するだけでよい. flume-clickhouse-sink https://reviews.apache.org/r/50692/diff/1#2 コンパイル flume-clickhouse-sink-1.5.2.jarをflumeのlibディレクトリの下に置く clickhouse建表 リモート・アクセスのオープン /etc/clickhouse-server/config.xml
注意フィールド順序はcustom.queryと同じ DateTimeとDateの違い
新規ch.conf /usr/local/flumeディレクトリでch.conf を新規作成
flumeの起動 flume/binディレクトリの下 flume-ng agent –conf conf –conf-file/usr/local/flume/ch.conf –name agentTest -Dflume.root.logger=INFO,console
clickhouseデータの表示 クエリーデータ: select * from flume_ng_sql_source order by id;
データディレクトリの表示: /var/lib/clickhouse/data/default/flume_ng_sql_source/ パーティションごとに1つのディレクトリ
質問 oracleからのデータの読み込みに成功し、clickhouseへの書き込みに失敗すると、ステータスファイルのlastindex値も変更されます. このプロセスをoracleに変更するかどうか->flume->kafka->flume->clickhouse 参照先:https://www.cnblogs.com/yangcx666/p/8723849.html
oracleデータアクセスには2つの側面があります.1つはCDCで、ログに基づいてデータの変化をキャプチャし、削除変更を含む.二つ目は、インクリメンタルデータの準リアルタイムインポートであり、自己増加idまたは時間フィールドに依存し、CDCに比べて配置が簡単であり、適用シーンもインクリメンタルデータにのみ適用される.ここでは、インクリメンタルデータへのアクセスについてのみ説明します.
一、clickhouse単機取付
二、oracle->flume->kafka
kafka connectのjdbc source接続oracleを使用して、接続に失敗し続けます.しかしmysqlに接続することは可能であり、しばらく問題点は見つからない.
create table flume_ng_sql_source (
id varchar2(32) primary key,
msg varchar2(32),
createTime date not null
);
insert into flume_ng_sql_source(id,msg,createTime) values('1','Test increment Data',to_date('2017-08-01 07:06:20','yyyy-mm-dd hh24:mi:ss'));
insert into flume_ng_sql_source(id,msg,createTime) values('2','Test increment Data',to_date('2017-08-02 07:06:20','yyyy-mm-dd hh24:mi:ss'));
insert into flume_ng_sql_source(id,msg,createTime) values('3','Test increment Data',to_date('2017-08-03 07:06:20','yyyy-mm-dd hh24:mi:ss'));
insert into flume_ng_sql_source(id,msg,createTime) values('4','Test increment Data',to_date('2017-08-04 07:06:20','yyyy-mm-dd hh24:mi:ss'));
insert into flume_ng_sql_source(id,msg,createTime) values('5','Test increment Data',to_date('2017-08-05 07:06:20','yyyy-mm-dd hh24:mi:ss'));
insert into flume_ng_sql_source(id,msg,createTime) values('6','Test increment Data',to_date('2017-08-06 07:06:20','yyyy-mm-dd hh24:mi:ss'));
commit;
agentTest.channels = channelTest
agentTest.sources = sourceTest
agentTest.sinks = sinkTest
###########sql source#################
# For each Test of the sources, the type is defined
agentTest.sources.sourceTest.type = org.keedio.flume.source.SQLSource
agentTest.sources.sourceTest.hibernate.connection.url = jdbc:oracle:thin:@10.8.7.96:1521/ora11g
# Hibernate Database connection properties
agentTest.sources.sourceTest.hibernate.connection.user = taizhou
agentTest.sources.sourceTest.hibernate.connection.password = 123456
agentTest.sources.sourceTest.hibernate.connection.autocommit = true
agentTest.sources.sourceTest.hibernate.dialect = org.hibernate.dialect.Oracle10gDialect
agentTest.sources.sourceTest.hibernate.connection.driver_class = oracle.jdbc.driver.OracleDriver
agentTest.sources.sourceTest.run.query.delay=10000
agentTest.sources.sourceTest.enclose.by.quotes = false
agentTest.sources.sourceTest.status.file.path = /usr/local/flume
agentTest.sources.sourceTest.status.file.name = agentTest.sqlSource.status
# Custom query
agentTest.sources.sourceTest.start.from = 2017-07-31 07:06:20
agentTest.sources.sourceTest.custom.query = SELECT TO_CHAR(CREATETIME,'YYYY-MM-DD HH24:MI:SS'),ID,MSG FROM FLUME_NG_SQL_SOURCE WHERE CREATETIME > TO_DATE('$@$','YYYY-MM-DD HH24:MI:SS') ORDER BY CREATETIME ASC
agentTest.sources.sourceTest.batch.size = 1000
agentTest.sources.sourceTest.max.rows = 1000
agentTest.sources.sourceTest.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
agentTest.sources.sourceTest.hibernate.c3p0.min_size=1
agentTest.sources.sourceTest.hibernate.c3p0.max_size=10
##############################
agentTest.channels.channelTest.type = memory
agentTest.channels.channelTest.capacity = 1000
agentTest.channels.channelTest.transactionCapacity = 1000
agentTest.channels.channelTest.byteCapacityBufferPercentage = 20
agentTest.channels.channelTest.byteCapacity = 1600000
agentTest.sinks.sinkTest.type = org.apache.flume.sink.kafka.KafkaSink
agentTest.sinks.sinkTest.topic = test13
agentTest.sinks.sinkTest.brokerList = 10.8.7.85:6667
agentTest.sinks.sinkTest.requiredAcks = 1
agentTest.sinks.sinkTest.batchSize = 20
agentTest.sinks.sinkTest.channel = channelTest
agentTest.sinks.sinkTest.channel = channelTest
agentTest.sources.sourceTest.channels=channelTest
三、oracle->flume->clickhouse
flumeのsourceおよびchannelはアクセスkafkaと同様でsinkを変更するだけでよい.
<listen_host>::1listen_host>
<listen_host> iplisten_host>
clickhouse-client -m CREATE TABLE flume_ng_sql_source
(
createtime DateTime,
id UInt32,
msg String
) engine = MergeTree PARTITION BY toYYYYMMDD(createtime) order by id SETTINGS index_granularity = 8192;
agentTest.channels = channelTest
agentTest.sources = sourceTest
agentTest.sinks = sinkTest
###########sql source#################
# For each Test of the sources, the type is defined
agentTest.sources.sourceTest.type = org.keedio.flume.source.SQLSource
agentTest.sources.sourceTest.hibernate.connection.url = jdbc:oracle:thin:@10.8.7.96:1521/ora11g
# Hibernate Database connection properties
agentTest.sources.sourceTest.hibernate.connection.user = taizhou
agentTest.sources.sourceTest.hibernate.connection.password = 123456
agentTest.sources.sourceTest.hibernate.connection.autocommit = true
agentTest.sources.sourceTest.hibernate.dialect = org.hibernate.dialect.Oracle10gDialect
agentTest.sources.sourceTest.hibernate.connection.driver_class = oracle.jdbc.driver.OracleDriver
agentTest.sources.sourceTest.run.query.delay=10000
agentTest.sources.sourceTest.enclose.by.quotes = false
agentTest.sources.sourceTest.status.file.path = /usr/local/flume
agentTest.sources.sourceTest.status.file.name = agentTest.sqlSource.status
# Custom query
agentTest.sources.sourceTest.start.from = 2017-07-31 07:06:20
agentTest.sources.sourceTest.custom.query = SELECT TO_CHAR(CREATETIME,'YYYY-MM-DD HH24:MI:SS'),ID,MSG FROM FLUME_NG_SQL_SOURCE WHERE CREATETIME > TO_DATE('$@$','YYYY-MM-DD HH24:MI:SS') ORDER BY CREATETIME ASC
agentTest.sources.sourceTest.batch.size = 1000
agentTest.sources.sourceTest.max.rows = 1000
agentTest.sources.sourceTest.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
agentTest.sources.sourceTest.hibernate.c3p0.min_size=1
agentTest.sources.sourceTest.hibernate.c3p0.max_size=10
##############################
agentTest.channels.channelTest.type = memory
agentTest.channels.channelTest.capacity = 1000
agentTest.channels.channelTest.transactionCapacity = 1000
agentTest.channels.channelTest.byteCapacityBufferPercentage = 20
agentTest.channels.channelTest.byteCapacity = 1600000
agentTest.sinks.sinkTest.type = org.apache.flume.sink.clickhouse.ClickHouseSink
agentTest.sinks.sinkTest.host = http://10.8.7.96
agentTest.sinks.sinkTest.port = 8123
agentTest.sinks.sinkTest.database = default
agentTest.sinks.sinkTest.table = flume_ng_sql_source
agentTest.sinks.sinkTest.batchSize = 3000
agentTest.sinks.sinkTest.format = TabSeparated
agentTest.sinks.sinkTest.channel = channelTest
agentTest.sources.sourceTest.channels=channelTest