oracleデータアクセスclickhouse

12984 ワード

oracleデータアクセスclickhouse
oracleデータアクセスには2つの側面があります.1つはCDCで、ログに基づいてデータの変化をキャプチャし、削除変更を含む.二つ目は、インクリメンタルデータの準リアルタイムインポートであり、自己増加idまたは時間フィールドに依存し、CDCに比べて配置が簡単であり、適用シーンもインクリメンタルデータにのみ適用される.ここでは、インクリメンタルデータへのアクセスについてのみ説明します.
一、clickhouse単機取付
  • OpenSSLのアップグレード
  • rpm -Uvh openssl-1.0.2k-12.el7.x86_64.rpm

  • Unixodbcのインストール
  • rpm -ivh unixODBC-2.3.1-11.el7.x86_64.rpm

  • clickhouseのインストール
  • http://repo.yandex.ru/clickhouse/rpm/stable/x86_64/
  • rpm -ivh clickhouse*


  • 二、oracle->flume->kafka
    kafka connectのjdbc source接続oracleを使用して、接続に失敗し続けます.しかしmysqlに接続することは可能であり、しばらく問題点は見つからない.
  • flumeインストール
  • (略)hdpにおけるFlume 1.5.2
  • を使用
  • kafkaインストール
  • (略)hdpにおけるkafka 0.10.1
  • を用いる
  • flume-ng-sql-source
  • ダウンロードアドレス:https://github.com/keedio/flume-ng-sql-source.git,http://repo.red-soft.biz/repos/clickhouse/stable/el7/
  • コンパイル:
  • clickhouseの書き込みが便利で、デフォルトの区切り記号を','から't';に変更します.
  • 以降、jsonフォーマットの追加が考えられる.

  • flume-ng-sql-source-1.4.3.jarをflumeのlibディレクトリの下に置く
  • oracle
  • 建表

  • create table flume_ng_sql_source (
    id varchar2(32) primary key,
    msg varchar2(32),
    createTime date not null
    );

  • データ挿入

  • insert into flume_ng_sql_source(id,msg,createTime) values('1','Test increment Data',to_date('2017-08-01 07:06:20','yyyy-mm-dd hh24:mi:ss'));
    insert into flume_ng_sql_source(id,msg,createTime) values('2','Test increment Data',to_date('2017-08-02 07:06:20','yyyy-mm-dd hh24:mi:ss'));
    insert into flume_ng_sql_source(id,msg,createTime) values('3','Test increment Data',to_date('2017-08-03 07:06:20','yyyy-mm-dd hh24:mi:ss'));
    insert into flume_ng_sql_source(id,msg,createTime) values('4','Test increment Data',to_date('2017-08-04 07:06:20','yyyy-mm-dd hh24:mi:ss'));
    insert into flume_ng_sql_source(id,msg,createTime) values('5','Test increment Data',to_date('2017-08-05 07:06:20','yyyy-mm-dd hh24:mi:ss'));
    insert into flume_ng_sql_source(id,msg,createTime) values('6','Test increment Data',to_date('2017-08-06 07:06:20','yyyy-mm-dd hh24:mi:ss'));
    commit;

  • ojdbc 6.jarをflumeのlibディレクトリの下に置く
  • 新規flume-sql.conf
  • /usr/local/flumeディレクトリにflume-sql.conf
    agentTest.channels = channelTest
    agentTest.sources = sourceTest
    agentTest.sinks = sinkTest
    
    ###########sql source#################
    
    
    # For each Test of the sources, the type is defined
    
    agentTest.sources.sourceTest.type = org.keedio.flume.source.SQLSource
    agentTest.sources.sourceTest.hibernate.connection.url = jdbc:oracle:thin:@10.8.7.96:1521/ora11g
    
    # Hibernate Database connection properties
    
    agentTest.sources.sourceTest.hibernate.connection.user = taizhou
    agentTest.sources.sourceTest.hibernate.connection.password = 123456
    agentTest.sources.sourceTest.hibernate.connection.autocommit = true
    agentTest.sources.sourceTest.hibernate.dialect = org.hibernate.dialect.Oracle10gDialect
    agentTest.sources.sourceTest.hibernate.connection.driver_class = oracle.jdbc.driver.OracleDriver
    agentTest.sources.sourceTest.run.query.delay=10000
    agentTest.sources.sourceTest.enclose.by.quotes = false
    
    agentTest.sources.sourceTest.status.file.path = /usr/local/flume
    agentTest.sources.sourceTest.status.file.name = agentTest.sqlSource.status
    
    # Custom query
    
    agentTest.sources.sourceTest.start.from = 2017-07-31 07:06:20
    agentTest.sources.sourceTest.custom.query = SELECT TO_CHAR(CREATETIME,'YYYY-MM-DD HH24:MI:SS'),ID,MSG FROM FLUME_NG_SQL_SOURCE WHERE CREATETIME > TO_DATE('$@$','YYYY-MM-DD HH24:MI:SS') ORDER BY CREATETIME ASC
    
    agentTest.sources.sourceTest.batch.size = 1000
    agentTest.sources.sourceTest.max.rows = 1000
    agentTest.sources.sourceTest.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
    agentTest.sources.sourceTest.hibernate.c3p0.min_size=1
    agentTest.sources.sourceTest.hibernate.c3p0.max_size=10
    
    ##############################
    
    agentTest.channels.channelTest.type = memory
    agentTest.channels.channelTest.capacity = 1000
    agentTest.channels.channelTest.transactionCapacity = 1000
    agentTest.channels.channelTest.byteCapacityBufferPercentage = 20
    agentTest.channels.channelTest.byteCapacity = 1600000
    
    agentTest.sinks.sinkTest.type = org.apache.flume.sink.kafka.KafkaSink
    agentTest.sinks.sinkTest.topic = test13
    agentTest.sinks.sinkTest.brokerList = 10.8.7.85:6667
    agentTest.sinks.sinkTest.requiredAcks = 1
    agentTest.sinks.sinkTest.batchSize = 20
    agentTest.sinks.sinkTest.channel = channelTest
    
    agentTest.sinks.sinkTest.channel = channelTest
    agentTest.sources.sourceTest.channels=channelTest
  • を新規作成
  • 注意時間フィールドは1番目です.
  • $@$は引用符付きで、start.from値は引用符を必要としません.その値が$@$を入力すると引用符が付きます.

  • flumeの起動
  • flume/binディレクトリの下
  • flume-ng agent –conf conf –conf-file/usr/local/flume/flume-sql.conf –name agentTest -Dflume.root.logger=INFO,console

  • kafkaにデータが書き込まれているかどうかを確認
  • kafka-console-consumer.sh –zookeeper localhost:2181 –topic TestTopic
  • データはタブ分割、引用符なし

  • ステータスファイルの表示
  • /usr/local/flume/agentTest.sqlSource.status
  • LastIndex値が最後にインポートされた最大時間フィールド
  • 最初から再インポートするには、このファイルを削除する必要があります


  • 三、oracle->flume->clickhouse
    flumeのsourceおよびchannelはアクセスkafkaと同様でsinkを変更するだけでよい.
  • flume-clickhouse-sink
  • https://reviews.apache.org/r/50692/diff/1#2
  • コンパイル
  • flume-clickhouse-sink-1.5.2.jarをflumeのlibディレクトリの下に置く
  • clickhouse建表
  • リモート・アクセスのオープン
  • /etc/clickhouse-server/config.xml

  • <listen_host>::1listen_host>
    <listen_host>  iplisten_host>
    clickhouse-client -m
  • CREATE TABLE flume_ng_sql_source
    (
    createtime DateTime,
    id UInt32, 
    msg String
    )  engine = MergeTree PARTITION BY toYYYYMMDD(createtime) order by id SETTINGS index_granularity = 8192;
  • 注意フィールド順序はcustom.queryと同じ
  • DateTimeとDateの違い

  • 新規ch.conf
  • /usr/local/flumeディレクトリでch.conf
  • を新規作成
  • agentTest.channels = channelTest
    agentTest.sources = sourceTest
    agentTest.sinks = sinkTest
    
    ###########sql source#################
    
    
    # For each Test of the sources, the type is defined
    
    agentTest.sources.sourceTest.type = org.keedio.flume.source.SQLSource
    agentTest.sources.sourceTest.hibernate.connection.url = jdbc:oracle:thin:@10.8.7.96:1521/ora11g
    
    # Hibernate Database connection properties
    
    agentTest.sources.sourceTest.hibernate.connection.user = taizhou
    agentTest.sources.sourceTest.hibernate.connection.password = 123456
    agentTest.sources.sourceTest.hibernate.connection.autocommit = true
    agentTest.sources.sourceTest.hibernate.dialect = org.hibernate.dialect.Oracle10gDialect
    agentTest.sources.sourceTest.hibernate.connection.driver_class = oracle.jdbc.driver.OracleDriver
    agentTest.sources.sourceTest.run.query.delay=10000
    agentTest.sources.sourceTest.enclose.by.quotes = false
    
    agentTest.sources.sourceTest.status.file.path = /usr/local/flume
    agentTest.sources.sourceTest.status.file.name = agentTest.sqlSource.status
    
    # Custom query
    
    agentTest.sources.sourceTest.start.from = 2017-07-31 07:06:20
    agentTest.sources.sourceTest.custom.query = SELECT TO_CHAR(CREATETIME,'YYYY-MM-DD HH24:MI:SS'),ID,MSG FROM FLUME_NG_SQL_SOURCE WHERE CREATETIME > TO_DATE('$@$','YYYY-MM-DD HH24:MI:SS') ORDER BY CREATETIME ASC
    
    agentTest.sources.sourceTest.batch.size = 1000
    agentTest.sources.sourceTest.max.rows = 1000
    agentTest.sources.sourceTest.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
    agentTest.sources.sourceTest.hibernate.c3p0.min_size=1
    agentTest.sources.sourceTest.hibernate.c3p0.max_size=10
    
    ##############################
    
    agentTest.channels.channelTest.type = memory
    agentTest.channels.channelTest.capacity = 1000
    agentTest.channels.channelTest.transactionCapacity = 1000
    agentTest.channels.channelTest.byteCapacityBufferPercentage = 20
    agentTest.channels.channelTest.byteCapacity = 1600000
    
    agentTest.sinks.sinkTest.type = org.apache.flume.sink.clickhouse.ClickHouseSink
    agentTest.sinks.sinkTest.host = http://10.8.7.96
    agentTest.sinks.sinkTest.port = 8123
    agentTest.sinks.sinkTest.database = default
    agentTest.sinks.sinkTest.table = flume_ng_sql_source
    agentTest.sinks.sinkTest.batchSize = 3000
    agentTest.sinks.sinkTest.format = TabSeparated
    
    agentTest.sinks.sinkTest.channel = channelTest
    agentTest.sources.sourceTest.channels=channelTest

  • flumeの起動
  • flume/binディレクトリの下
  • flume-ng agent –conf conf –conf-file/usr/local/flume/ch.conf –name agentTest -Dflume.root.logger=INFO,console


  • clickhouseデータの表示
  • クエリーデータ:
  • select * from flume_ng_sql_source order by id;

  • データディレクトリの表示:
  • /var/lib/clickhouse/data/default/flume_ng_sql_source/
  • パーティションごとに1つのディレクトリ

  • 質問
  • oracleからのデータの読み込みに成功し、clickhouseへの書き込みに失敗すると、ステータスファイルのlastindex値も変更されます.
  • このプロセスをoracleに変更するかどうか->flume->kafka->flume->clickhouse
  • 参照先:https://www.cnblogs.com/yangcx666/p/8723849.html ​