Canal採集mysql動的監視、データ採集を実現し、kafkaにデータを送信する
12002 ワード
Canal採集プログラム構築
Java言語を使用してcanalのbinlogログを解析し、Kafkaに書き込む
mysql binLogログを開く viを使用して/etc/myを開く.cnf 以下の構成 を追加する.
3.mysqlを再起動する
4.Mysqlクライアント入力へ
±--------------------------------±-------------------------------+ | Variable_name | Value | ±--------------------------------±-------------------------------+ | log_bin | ON | | log_bin_basename |/var/lib/mysql/mysql-bin | | log_bin_index |/var/lib/mysql/mysql-bin.index | | log_bin_trust_function_creators | OFF | | log_bin_use_v1_row_events | OFF | | sql_log_bin | ON | ±--------------------------------±-------------------------------+
canalクライアントの作成
pom依存のインポート
二、GlobalConfigUtilツール類を作成し、アプリケーションを読み取る.propertiesプロファイル
ステップ GlobalConfigUtilツールクラスを作成し、アプリケーションを読み込む.propertiesのcanalとkafka構成 mainメソッドを追加し、構成 を正しく読み取ることができるかどうかをテストします.
GlobalConfigUtil.java
KafkaSenderツールクラスを作成し、kafka KafkaSenderにデータを送信する.java
Canalクライアントクラスを作成し、binlogログを解析し、kafka CanalClient.にデータを送信する.java
Java言語を使用してcanalのbinlogログを解析し、Kafkaに書き込む
mysql binLogログを開く
# binlog /var/lib/mysql , mysql-bin
log-bin=/var/lib/mysql/mysql-bin
# mysql
binlog-format=ROW
# ID( mysql , )
server_id=1
3.mysqlを再起動する
service mysqld restart
4.Mysqlクライアント入力へ
show variables like '%log_bin%';
±--------------------------------±-------------------------------+ | Variable_name | Value | ±--------------------------------±-------------------------------+ | log_bin | ON | | log_bin_basename |/var/lib/mysql/mysql-bin | | log_bin_index |/var/lib/mysql/mysql-bin.index | | log_bin_trust_function_creators | OFF | | log_bin_use_v1_row_events | OFF | | sql_log_bin | ON | ±--------------------------------±-------------------------------+
canalクライアントの作成
pom依存のインポート
com.alibaba.otter
canal.client
1.0.24
org.apache.kafka
kafka_2.11
0.9.0.1
com.alibaba
fastjson
1.2.44
二、GlobalConfigUtilツール類を作成し、アプリケーションを読み取る.propertiesプロファイル
ステップ
GlobalConfigUtil.java
import java.util.ResourceBundle;
/**
*
*/
public class GlobalConfigUtil {
// application.properties
private static ResourceBundle resourceBundle = ResourceBundle.getBundle("application");
public static String canalHost= resourceBundle.getString("canal.host");
public static String canalPort = resourceBundle.getString("canal.port");
public static String canalInstance = resourceBundle.getString("canal.instance");
public static String mysqlUsername = resourceBundle.getString("mysql.username");
public static String mysqlPassword= resourceBundle.getString("mysql.password");
public static String kafkaBootstrap= resourceBundle.getString("kafka.bootstrap.servers");
public static String kafkaZookeeper= resourceBundle.getString("kafka.zookeeper.connect");
public static String kafkaInput = resourceBundle.getString("kafka.input.topic");
public static void main(String[] args) {
System.out.println(canalHost);
}
}
KafkaSenderツールクラスを作成し、kafka KafkaSenderにデータを送信する.java
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.serializer.StringEncoder;
import java.util.Properties;
/**
* Kafka
*/
public class KafkaSender {
private String topic;
public KafkaSender(String topic){
super();
this.topic = topic;
}
/**
* Kafka topic
*
* @param topic topic
* @param key
* @param data
*/
public static void sendMessage(String topic , String key , String data){
Producer producer = createProducer();
producer.send(new KeyedMessage(topic , key , data));
}
/**
*
* @return
*/
private static Producer createProducer(){
Properties properties = new Properties();
properties.put("metadata.broker.list" , GlobalConfigUtil.kafkaBootstrap);
properties.put("zookeeper.connect" , GlobalConfigUtil.kafkaZookeeper);
properties.put("serializer.class" , StringEncoder.class.getName());
return new Producer(new ProducerConfig(properties));
}
}
Canalクライアントクラスを作成し、binlogログを解析し、kafka CanalClient.にデータを送信する.java
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.itheima.canal_kafka.util.GlobalConfigUtil;
import com.itheima.canal_kafka.util.KafkaSender;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
/**
* Canal binlog
*/
public class CanalClient {
static class ColumnValuePair {
private String columnName;
private String columnValue;
private Boolean isValid;
public ColumnValuePair(String columnName, String columnValue, Boolean isValid) {
this.columnName = columnName;
this.columnValue = columnValue;
this.isValid = isValid;
}
public String getColumnName() { return columnName; }
public void setColumnName(String columnName) { this.columnName = columnName; }
public String getColumnValue() { return columnValue; }
public void setColumnValue(String columnValue) { this.columnValue = columnValue; }
public Boolean getIsValid() { return isValid; }
public void setIsValid(Boolean isValid) { this.isValid = isValid; }
}
/**
* Canal
*
* @param host
* @param port
* @param instance Canal
* @param username
* @param password
* @return Canal
*/
public static CanalConnector getConn(String host, int port, String instance, String username, String password) {
CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress(host, port), instance, username, password);
return canalConnector;
}
/**
* Binlog
*
* @param entries Binlog
* @param emptyCount
*/
public static void analysis(List entries, int emptyCount) {
for (CanalEntry.Entry entry : entries) {
// mysql ,
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN ||
entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
// binlog
CanalEntry.RowChange rowChange = null;
try {
rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
e.printStackTrace();
}
// ( )
CanalEntry.EventType eventType = rowChange.getEventType();
// binlog
String logfileName = entry.getHeader().getLogfileName();
// binlog
long logfileOffset = entry.getHeader().getLogfileOffset();
//
String dbName = entry.getHeader().getSchemaName();
//
String tableName = entry.getHeader().getTableName();//
long timestamp = entry.getHeader().getExecuteTime();//
//
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
//
if (eventType == CanalEntry.EventType.DELETE) {
//
dataDetails(rowData.getBeforeColumnsList(), logfileName, logfileOffset, dbName, tableName, eventType, emptyCount,timestamp);
}
//
else if (eventType == CanalEntry.EventType.INSERT) {
//
dataDetails(rowData.getAfterColumnsList(), logfileName, logfileOffset, dbName, tableName, eventType, emptyCount,timestamp);
}
//
else {
//
dataDetails(rowData.getAfterColumnsList(), logfileName, logfileOffset, dbName, tableName, eventType, emptyCount,timestamp);
}
}
}
}
/**
* Binlog
*
* @param columns
* @param logFileName binlog
* @param logFileOffset binlog
* @param dbName
* @param tableName
* @param eventType ( 、 、 )
* @param emptyCount
*/
private static void dataDetails(List columns,
String logFileName,
Long logFileOffset,
String dbName,
String tableName,
CanalEntry.EventType eventType,
int emptyCount,
long timestamp) {
//
List columnValueList = new ArrayList();
for (CanalEntry.Column column : columns) {
ColumnValuePair columnValuePair = new ColumnValuePair(column.getName(), column.getValue(), column.getUpdated());
columnValueList.add(columnValuePair);
}
String key = UUID.randomUUID().toString();
JSONObject jsonObject = new JSONObject();
jsonObject.put("logFileName", logFileName);
jsonObject.put("logFileOffset", logFileOffset);
jsonObject.put("dbName", dbName);
jsonObject.put("tableName", tableName);
jsonObject.put("eventType", eventType);
jsonObject.put("columnValueList", columnValueList);
jsonObject.put("emptyCount", emptyCount);
jsonObject.put("timestamp", timestamp);
// binlog
String data = JSON.toJSONString(jsonObject);
System.out.println(data);
// kafka
KafkaSender.sendMessage(GlobalConfigUtil.kafkaInput, key, data);
}
/**
*
* @param args
*/
public static void main(String[] args) {
//
String host = GlobalConfigUtil.canalHost;
int port = Integer.parseInt(GlobalConfigUtil.canalPort);
String instance = GlobalConfigUtil.canalInstance;
String username = GlobalConfigUtil.mysqlUsername;
String password = GlobalConfigUtil.mysqlPassword;
// Canal
CanalConnector conn = getConn(host, port, instance, username, password);
// binlog
int batchSize = 100;
int emptyCount = 1;
try {
conn.connect();
conn.subscribe(".*\\..*");
conn.rollback();
int totalCount = 120; //
while (totalCount > emptyCount) {
//
Message message = conn.getWithoutAck(batchSize);
long id = message.getId();
int size = message.getEntries().size();
if (id == -1 || size == 0) {
//
} else {
// , binlog
analysis(message.getEntries(), emptyCount);
emptyCount++;
}
//
conn.ack(message.getId());
}
} catch (Exception e) {
e.printStackTrace();
} finally {
conn.disconnect();
}
}
}