セルフ構築mysql-canal-kafkaチャネル


このcanalはjavaを集積して、煩雑なインストールを必要としないで、直接いくつかのパラメータを配置してmysql-canal-kafkaを実現することができます
Mysql環境構成
一、binlogフォーマット
mysqlパラメータは以下のように調整すればよい.
mysql> show variables like 'binlog_format';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW   |
+---------------+-------+
1 row in set (0.00 sec)

参照構成(my.cnf):[mysqld] character_set_server = utf8 init_connect         =  'SET NAMES utf8' server-id            =  1 log_bin              = mysql-bin binlog_format        = row expire_logs_days     =  30 sql_mode             = NO_ENGINE_SUBSTITUTION,STRICT_TRANS_TABLES
上記の構成を変更したら、flush logsコマンドをもう一度実行したほうがいいです.
二、super privilege user
オフラインテストのデフォルトデータベースはrootで、rootがあればrootでいいです.
Kafka環境構成
このインストールは比較的に簡単で、ネット上ですべてあって、かすめて、canalはkafkaのbrokerアドレスとtopicを構成する必要があります
Zookeeper環境構成
オンラインで使用するzookeeperは3.4.6バージョンで、どこかに配置し、zooを微調整します.cfg、起動すればいいです.
Canal環境構成
一、配置
canal-liteをダウンロードします.xxx(添付ファイルが大きすぎてアップロードできません.必要があればメッセージを残してください)ターゲットの位置に着いて、以下のコマンドを実行します.
cat canal-lite.x* > canal.tar.gz
tar -zxvf canal.tar.gz

canalはjavaバージョン>=1.7で実行する必要があります.開閉方式はそれぞれsh bin/startup.shsh bin/stop.shである.
二、構成変更
canalで変更する必要がある構成は、次の2つの場所にあります.
conf/canal.properties
conf/example/instance.properties
ここでexampleはdestination名であり、他のcanalとzookeeperを混在させる場合はdestinationが衝突しないことを保証します.
conf/canal.propertiesが変更するパラメータ
1
2
3
4
5 # canal , canal.port =  3406   # canal zookeeper canal.zkServers =  127.0 . 0.1 : 2181
conf/destination/instance.propertiesが変更するパラメータ
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 # canal mysql slave id, mysql id canal.instance.mysql.slaveId =  123456   # canal mysql / / canal.instance.master.address =  127.0 . 0.1 : 3306 canal.instance.dbUsername = root canal.instance.dbPassword =  123456   # mapping database to topicstrategy # format: db1:topic1,db2:topic2,... canal.instance.topicMapping        = orders:db_test,test:db_test
  # partition strategy canal.instance.partitionByPrimaryEnable =  true
  # partition by a specific column, only effective when partitionByPrimary = False # format: name of the column canal.instance.partitionColumn     =   # kafka broker address canal.brokerAddress                =