OGGはMYSQL 5から.7 BigData(Kafka)への同期

8727 ワード

1説明
1.1ソフトウェアバージョン
Mysql:5.7.26ダウンロードアドレス
Mysql Ogg:12.2.0.2ダウンロードアドレス
BigData Ogg:12.3.2.1.1ダウンロードアドレス
2インストール
2.1 Mysql
(1)解凍して名前を変更
tar -zxvf mysql-5.7.26-1.el6.x86_64.rpm-bundle.tar.gz
mv mysql-5.7.26-linux-glibc2.12-x86_64/usr/local/mysql5.7
(2)Mysqlプロファイル(/etc/my.cnf)
[mysqld]
datadir=/usr/local/mysql5.7/data
socket=/var/lib/mysql/mysql.sock
user=mysql
symbolic-links=0
#######データベースタイムアウト時間
connect_timeout=388000
#######wait_timeout=388000
interactive_timeout=388000
############デフォルトのストレージエンジン
default-storage-engine=INNODB
innodb_buffer_pool_size=1073741824
################################################ogg構成
log-bin=mysql-bin#この行を追加すればok
binlog-format=ROW#選択rowモード
server_id=1#mysql replactionを構成するには定義が必要で、canalのslaveIdと繰り返すことはできません
[mysqld_safe]
log-error=/var/log/mysqld5.7.log
pid-file=/var/run/mysqld/mysqld5.7.pid
[client]
default-character-set=utf8
(3) Mysql.server(/usr/local/mysql5.7/support-files/mysql.server
basedir=/usr/local/mysql5.7
datadir=/usr/local/mysql5.7/data
(4)初期化
cd/usr/local/mysql5.7/bin
./mysqld--initialize->root初期パスワードが出力されますので、コピーしてください
(5)ファイル構成の起動
cp/usr/local/mysql5.7/support-files/mysql.server/etc/init.d/mysql5.7
(6)データベースの起動
services mysqld5.7 start
(7)sock異常の可能性
解決する
ln -s/var/lib/mysql/mysql.sock/tmp/mysql.sock
(8)Mysqlのpath
古いバージョンのバックアップ
cd/usr/bin
mkdir mysql_bak
mv mysql* mysql_bak;
5.7バージョンのPATHを確立
vim/etc/profile
export MYSQL_HOME=/usr/local/mysql5.7
export PATH=MYSQL_HOME/bin
(9)Mysql用rootログイン
mysql -uroot
前にコピーしたパスワードを入力
alter user root identified by ‘root’; -> rootパスワードの変更
(10)試験庫の作成
Mysql -uroot -proot <
Create user gg1 identified by gg1;
Create database gg1db default character set =utf8;
Grant all privileges on gg1db.* to gg1@localhost identified by ‘gg1’;
Flush privileges;
!
2.2ソース
(1)ggsユーザを確立し、ggsパケットをggsユーザの下にコピーする
user add ggs
Passwd ggs -> ggs
chown -R ggs:ggs/home/ggs
(2)oggディレクトリに解凍
su - ggs
mkdir ogg
uuzip 122022_ggs_Linux_x64_MySQL_64bit.zip
tar -xvf *tar -C ogg
(3) env.sh
su - ggs
touch env.sh
export OGG_HOME=$HOME/ogg
export PATH=OGG_HOME
export LD_LIBRARY_PATH=OGG_HOME
(4)oggの初期化
su - ggs
source env.sh
cd ogg
./ggsci
===>実行:create subdirs
(5)構成プロセス
su - ggs
source env.sh
cd ogg
./ggsci
===>実行:dblogin sourcedbgg1db@localhost:3306,userid gg1,password gg1;
===>実行:edit param mgr
===>入力:
port 7015
ACCESSRULE, PROG *, IPADDR *, ALLOW
===>保存終了
===>実行:edit param ext 1
===>入力:
EXTRACT ext1
setenv (MYSQL_HOME="/usr/local/mysql5.7")
dboptions host 192.168.102.3, connectionport 3306
tranlogoptions altlogdest/usr/local/mysql5.7/data/mysql-bin.index
SOURCEDB gg1db@localhost:3306,userid gg1,PASSWORD gg1
EXTTRAIL ./dirdat/et
table gg1db.*;
===>保存終了
===>実行:edit parm pump 1
===>入力:
EXTRACT pump1
SOURCEDB gg1db@localhost:3306,userid gg1,PASSWORD gg1
RMTHOST 192.168.102.3, MGRPORT 8015
RMTTRAIL ./dirdat/rt
table gg1db.*;
===>保存終了;
===>実行:add extract ext 1,tranlog,begin now
===>実行:add exttrail./dirdat/et, extract ext1
===>実行:add extract pump 1,exttrailsource./dirdat/et
===>実行:add rmttrail./dirdat/rt,extract pump1
(6)defの構成
su - ggs
source env.sh
cd ogg
./ggsci
===>実行:edit param defgen
===>入力:
defsfile ./dirdef/defgen.def
sourcedb gg1db@localhost:3306,userid gg1,password gg1
table gg1db.*;
===>保存終了
defgenを生成する.def
ggsciを終了します.
実行:./defgen paramfile ./dirprm/defgen.prm
===dirdefディレクトリの下にdefgenが生成されます.defファイル、ターゲット側(ogg)の対応するdirdefディレクトリにファイルをコピーする必要があります
2.3目標端
(1)oggユーザを確立し、oggパケットをoggユーザの下にコピーする
user add ogg
Passwd ogg -> ogg
chown -R ogg:ogg/home/ogg
(2)oggディレクトリに解凍
su - ogg
mkdir ogg
uuzip OGG_BigData_Linux_x64_12.3.2.1.1.zip
tar -xvf *tar -C ogg
(3) env.sh
su - ogg
touch env.sh
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/java/jdk1.8.0_131/jre/lib/amd64/server/
export LD_LIBRARY_PATH=HOME/ogg
export JAVA_LIBDIR=/usr/share/java
(4)oggの初期化
su - ogg
source env.sh
cd ogg
./ggsci
===>実行:create subdirs
(5)構成プロセス
su - ogg
source env.sh
cd ogg
./ggsci
===>実行:edit param mgr
===>入力:
PORT 8015
ACCESSRULE, PROG *, IPADDR *, ALLOW
===>保存終了
===>実行:edit param rep 1
===>入力:
replicat rep1
sourcedefs ./dirdef/defgen.def
TARGETDB LIBFILE libggjava.so SET property=./dirprm/kafka.props
MAP gg1db., TARGET gg1db.;
===>保存終了
===>実行:add replicat rep 1,exttrail./dirdat/rt(ここでexttrailはソース側のdump構成と一致する必要があることに注意)
(6) kafka.props構成(dirprmディレクトリ下)
gg.handlerlist = kafkahandler
gg.handler.kafkahandler.type=kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
#######The following resolves the topic name using the short table name
gg.handler.kafkahandler.topicMappingTemplate=tggtest#これは定義されたトピックです
############The following selects the message key using the concatenated primary keys
############gg.handler.kafkahandler.keyMappingTemplate=${primaryKeys}
###########gg.handler.kafkahandler.format=avro_op
gg.handler.kafkahandler.SchemaTopicName=tggtest############定義のトピック
gg.handler.kafkahandler.BlockingSend =false
gg.handler.kafkahandler.includeTokens=false
gg.handler.kafkahandler.mode=op
gg.handler.kafkahandler.format=json
#########gg.handler.kafkahandler.format.insertOpKey=I
#######gg.handler.kafkahandler.format.updateOpKey=U
#########gg.handler.kafkahandler.format.deleteOpKey=D
#######gg.handler.kafkahandler.format.truncateOpKey=T
goldengate.userexit.writers=javawriter
javawriter.stats.display=TRUE
javawriter.stats.full=TRUE
gg.log=log4j
gg.log.level=INFO
gg.report.time=30sec
##########Sample gg.classpath for Apache Kafka
gg.classpath=dirprm/:/usr/local/kafka/libs/*
##########Sample gg.classpath for HDP
#########gg.classpath=/etc/kafka/conf:/usr/hdp/current/kafka-broker/libs/*
javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar
(7) custom_kafka_producer.properties構成(dirprmディレクトリ下)
bootstrap.servers=localhost:9092
acks=1
reconnect.backoff.ms=1000
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
######## 100KB per partition
batch.size=16384
linger.ms=0
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
3起動
3.1非oggプログラム
Mysql起動;3306ポート
Kafka起動;9092ポート
Zookeeper起動;2181ポート
3.2ソース
Su - ggs
Source env.sh
Cd ogg
./ggsci
===>実行:start mgr
===>実行:start ext 1
===>実行:start pump 1
ログggserr.log
3.3ターゲット・エンド
Su - ogg
Source env.sh
Cd ogg
./ggsci
===>実行:start mgr
===>実行:start rep 1
TBD:ソース側が不明なポートを頻繁に要求する場合、dirdatのディレクトリ構成エラーであるか、ターゲット側で次の操作を実行できます.
cd ogg
./servier-p 7819(7819はソースggserrのポート)
ログggserr.log
3.4検査
(1)Kafkaトピックが自動的に作成されるかどうか
Cd/usr/local/kakfa
./bin/kafka-topics.sh --list --zookeeper localhost:2181
tggtest->説明作成結果が表示された場合
(2)mysqlソースにデータを挿入または更新し、関連ログを観察する
ソース:/home/ggs/ogg/dirdat/et 00000*
ターゲット側:/home/ogg/ogg/dirdat/rt 0000*
(3)Kafkaプログラム
-- coding:utf-8 --
import kafka
from kafka import KafkaConsumer
consumer = KafkaConsumer('tggtest', bootstrap_servers=['localhost:9092'])
for message in consumer:
print "%s:%d:%d: key=%s value=%s" %(message.topic, message.partition,message.offset, message.key,message.value)

挿入メッセージを受信
tggtest:0:65: key=gg1db.t1 value={"table":"gg1db.t1","op_type":"I","op_ts":"2019-05-20 22:26:24.000108","current_ts":"2019-05-20T22:26:30.252000","pos":"00000000000000008226","after":{"id":2,"name":"g"}}