canal実戦(二)|canal javaクライアント
##この文章はアリのオープンソースツールCanalに改編して、原版のウェブサイトhttps://github.com/alibaba/canal
前のcanalで構築して起動すると(転送局)、第2ステップcanal javaクライアントを開始できます.
mavenプロジェクトを構築し、mavenプロジェクトはできない子供靴があれば転送ステーションを参照することができます.
pom.xml依存追加
com.alibaba.otter canal.client 1.1.0
はい、コードを書くことができます.問題があるところはコメントを書きます.
ここで変更する必要があるのはcanalの配置関連で、残りのパラメータは自分で実際の状況に基づいて配置することができます
実行後、MySQLを実行してデータを挿入します.
コンソールに表示されます
================》; binlog[mysql-bin.000003:12626] , name[hhy_test,canal_test] , eventType : INSERT id : 1 update=true name : test update=true bz : 2020-11-20 16:07:39 update=true
OK、これでクライアント構成完了!
前のcanalで構築して起動すると(転送局)、第2ステップcanal javaクライアントを開始できます.
mavenプロジェクトを構築し、mavenプロジェクトはできない子供靴があれば転送ステーションを参照することができます.
pom.xml依存追加
com.alibaba.otter canal.client 1.1.0
はい、コードを書くことができます.問題があるところはコメントを書きます.
package com.hhy.canal;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.Message;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
import java.util.List;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
@Component
public class CanalClient implements InitializingBean {
private final static int BATCH_SIZE = 1000;
public static void main(String args[]) {
CanalClient canalClient = new CanalClient();
try {
canalClient.afterPropertiesSet();
}catch (Exception e) {
e.printStackTrace();
}finally {
System.out.println("End");
}
}
@Override
public void afterPropertiesSet() throws Exception {
//
// canal ip, ,destination( canal.properties )
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.xxx.xxx", 11111), "example", "root", "xxxxxxxxx");
try {
//
connector.connect();
// ,
connector.subscribe(".*\\..*");
// ack , fetch , ack
connector.rollback();
while (true) {
//
Message message = connector.getWithoutAck(BATCH_SIZE);
// ID
long batchId = message.getId();
//
int size = message.getEntries().size();
//
if (batchId == -1 || size == 0) {
try {
// 2
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
// ,
printEntry(message.getEntries());
}
// batch id 。 , batchId Message 。
connector.ack(batchId);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
connector.disconnect();
}
}
/**
* canal server binlog
*/
private static void printEntry(List entrys) {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
// / ,
continue;
}
//RowChange ,
// isDdl ddl sql ddl sql beforeColumns afterColumns
RowChange rowChage;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
}
// :insert/update/delete
EventType eventType = rowChage.getEventType();
// Header
System.out.println(String.format("================》; binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
// DDL
if (rowChage.getIsDdl()) {
System.out.println("================》;isDdl: true,sql:" + rowChage.getSql());
}
// RowChange ,
for (RowData rowData : rowChage.getRowDatasList()) {
//
if (eventType == EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
//
} else if (eventType == EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
//
} else {
//
System.out.println("------->; before");
printColumn(rowData.getBeforeColumnsList());
//
System.out.println("------->; after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}
private static void printColumn(List columns) {
for (Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}
ここで変更する必要があるのはcanalの配置関連で、残りのパラメータは自分で実際の状況に基づいて配置することができます
実行後、MySQLを実行してデータを挿入します.
insert into hhy_test.canal_test VALUES(1,'test',now())
コンソールに表示されます
================》; binlog[mysql-bin.000003:12626] , name[hhy_test,canal_test] , eventType : INSERT id : 1 update=true name : test update=true bz : 2020-11-20 16:07:39 update=true
OK、これでクライアント構成完了!