Canalの穴はどれくらい出会ったの?
Canalの穴はどれくらい出会ったの? mysql文字セット を変更ユーザーcanal を作成 canal をインストール esのバージョンが6でない場合.x.xの場合、client-adapterを再コンパイルする必要があります.elasticsearch
注意!!!mysqlバージョンは5.6+の学生ではありません.やはり5.6+にアップグレードしましょう.看板を出して、本当に分かりません.mysql 5のため6 binlogにいくつかの内容を加えて5.5と少し違いますが、公式ドキュメントでは5.6+の解析プロトコルを閉じていません.md、本当に迷っています.やはりアップグレードしましょう.しかし、異常を報告しますが、基本的には使えます.あなたたちを見てください.
canalはこの操作を行います.5.5これがないため、検査開始時にエラーが発生した.select @@global.binlog_checksumプロファイル を変更
vim canal.propertie
vim instance.properties起動 消費者クライアント 大きな穴、本当に穴で、多くのバージョンを変えて、メッセージを受け取ることができなくて、canalの内部の詳しい原理に対して理解していないのではないでしょうか.クライアントはデータベースの変更のメッセージを受け取ることができなくて、怒っていますか.公式の安定も指摘されていません..やっと解決した
getWithoutAck(int)この方法は問題があると感じて、自分の問題かもしれませんが、少なくとも現在は問題があります.この方法は非ブロックで、メッセージに損失の問題が発生し、個人的にはバグだと思います.ネット上のThreadもあります.sleep(1000)は非ブロックのcpu性能の問題を解決するために、できますが、取ることはできません.もう一つの致命的な問題は、サービス側のメッセージを得ることができなくて、私は一日中このことをしています.だから、みんなはラッキーです.ブロックを使う方法がもっとよくて、合理的にタイムアウト時間をコントロールすればいいです.
さあ、話そう
準備はmysqlからesまでリアルタイムで同期するコンポーネントを作るので、canalを使いたいのですが、一日このcanalをデバッグしています.ああ、料理が多すぎます.頑張ってください.共に進歩して、私のノートがあなたに実際の仕事の中で少し時間を節約することができることを望んで、にこにこ.
# [client]
default-character-set=utf8
# [mysql]
character-set-server=utf8
# [mysqld]
character-set-server=utf8
init_connect='SET NAMES utf8' ( mysql utf8 , mysql utf8 , )
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
1. git clone https://github.com/alibaba/canal.git
2. idea
3.
pom es es , 5.6.10
com.alibaba.otter.canal.client.adapter.es.ESAdapter
transportClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host.substring(0, i)),
Integer.parseInt(host.substring(i + 1))));
//transportClient.addTransportAddress(new TransportAddress(InetAddress.getByName(host.substring(0, i)),
// Integer.parseInt(host.substring(i + 1))));
注意!!!mysqlバージョンは5.6+の学生ではありません.やはり5.6+にアップグレードしましょう.看板を出して、本当に分かりません.mysql 5のため6 binlogにいくつかの内容を加えて5.5と少し違いますが、公式ドキュメントでは5.6+の解析プロトコルを閉じていません.md、本当に迷っています.やはりアップグレードしましょう.しかし、異常を報告しますが、基本的には使えます.あなたたちを見てください.
canalはこの操作を行います.5.5これがないため、検査開始時にエラーが発生した.select @@global.binlog_checksum
vim canal.propertie
# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false
# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB
# binlog ddl isolation
canal.instance.get.ddl.isolation = false
# parallel parser config
canal.instance.parser.parallel = true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#
canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256
# table meta tsdb info
canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360
# aliyun ak/sk , support rds/mq
canal.aliyun.accesskey =
canal.aliyun.secretkey =
#################################################
######### destinations #############
#################################################
canal.destinations = example
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5
canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml
canal.instance.global.mode = spring
canal.instance.global.lazy = false
#canal.instance.global.manager.address = 127.0.0.1:1099
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml
##################################################
######### MQ #############
##################################################
vim instance.properties
################################################
## mysql serverId , v1.0.26+ will autoGen
#canal.instance.mysql.slaveId=0
# enable gtid use true/false
canal.instance.gtidon=false
# position info
#mysql
canal.instance.master.address=127.0.0.1:3306
# binlog
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=
# username/password
# mysql
canal.instance.dbUsername=root
canal.instance.dbPassword=root
canal.instance.connectionCharset = UTF-8
#canal.instance.defaultDatabaseName =test
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=
# mq config
canal.mq.topic=example
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=mytest.person:id,mytest.role:id
#################################################
sh startup.sh
getWithoutAck(int)この方法は問題があると感じて、自分の問題かもしれませんが、少なくとも現在は問題があります.この方法は非ブロックで、メッセージに損失の問題が発生し、個人的にはバグだと思います.ネット上のThreadもあります.sleep(1000)は非ブロックのcpu性能の問題を解決するために、できますが、取ることはできません.もう一つの致命的な問題は、サービス側のメッセージを得ることができなくて、私は一日中このことをしています.だから、みんなはラッキーです.ブロックを使う方法がもっとよくて、合理的にタイムアウト時間をコントロールすればいいです.
package com.yzz.es;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
/**
* @author yzz
* @time 2019/3/7 14:25
* @E-mail [email protected]
* @since 0.0.1
*/
public class TestCanal {
public static void main(String[] args) throws Exception {
start();
}
public static void start() throws Exception {
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("192.168.33.14", 11111), "example", "", "");
connector.connect();
connector.subscribe(".*");
System.out.println("success");
while (true) {
Message message = connector.getWithoutAck(100, (long) 1, TimeUnit.SECONDS);
long batchId = message.getId();
if (batchId == -1 || message.getEntries().isEmpty()) {
//System.out.println("sleep");
continue;
}
System.out.println(batchId);
printEntries(message.getEntries());
connector.ack(batchId);
}
}
private static void printEntries(List entries) throws Exception {
System.out.println("********************");
for (Entry entry :
entries) {
CanalEntry.Header header = entry.getHeader();
System.out.println("logfileName: " + header.getLogfileName());
System.out.println("logfileOffset : " + header.getLogfileOffset());
System.out.println("executeTime :" + header.getExecuteTime());
System.out.println("schemaName: " + header.getSchemaName());
System.out.println("table_name: " + header.getTableName());
System.out.println("eventType: " + header.getEventType());
printRowChange(RowChange.parseFrom(entry.getStoreValue()));
}
System.out.println("********************");
}
private static void printRowChange(RowChange rowChange) {
System.out.println("rowchange");
System.out.println(rowChange.getSql());
List rowDatasList = rowChange.getRowDatasList();
for (RowData rowData :
rowDatasList) {
List beforeColumnsList = rowData.getBeforeColumnsList();
List afterColumnsList = rowData.getAfterColumnsList();
System.out.println("before");
printColum(beforeColumnsList);
System.out.println("after");
printColum(afterColumnsList);
}
}
private static void printColum(List columns) {
for (Column column :
columns) {
System.out.println(column.getName() + ":" + column.getValue());
}
}
private static void printColumns(List columns) {
String line = columns.stream()
.map(column -> column.getName() + "=" + column.getValue())
.collect(Collectors.joining(","));
System.out.println(line);
}
}
さあ、話そう
準備はmysqlからesまでリアルタイムで同期するコンポーネントを作るので、canalを使いたいのですが、一日このcanalをデバッグしています.ああ、料理が多すぎます.頑張ってください.共に進歩して、私のノートがあなたに実際の仕事の中で少し時間を節約することができることを望んで、にこにこ.