canal実戦(一):canal接続kafkaリアルタイム同期mysqlデータを実現
7715 ワード
本ブログのすべての文章はオリジナルで、もし転載があれば、原文の住所を明記してください.ありがとうございます.桃の花は春風を惜しむhttps://blog.csdn.net/xiaoyu_BD/article/details/82261856
canal-kafkaの応用については前述した.canal-kafkaはkafkaをクライアントとしてcanalに埋め込み,canalに基づいてソースコードを修正し,特定の実装canalからkafkaへの伝送を達成する.
canal-kafkaはアリクラウドが最近更新した新しいインストールパッケージです.主な機能はcanalとkafkaのドッキングを実現し、大量のメッセージ伝送同期を実現することである.canal-kafkaでは、メッセージはByteStringで伝送され、ユーザーは構成によってkafkaの構成を指定するしかなく、ある程度の限界があるため、canalを使用してクライアントkafkaを定義すると、より柔軟性がありますが、より大きく維持されるので、実際の状況に応じてどのように選択するかが決まります.
参考:canalソース分析(一)クライアントプログラム
maven依存の構築
注意バージョン対応
SimpleCanalClient
analKafkaProducer
canalToKafkaServer
この簡単なcanalからkafkaまでのdemoはすでに完成しています.これらはテストコードにすぎず、実際のアプリケーションでは状況に応じて、自分でより多くの機能を開発することができます.
作者:桃花惜春风転載出典を明記してください、原文の住所:https://blog.csdn.net/xiaoyu_BD/article/details/82261856もし本文があなたに役に立つと感じたら、あなたの支持は私が書くことを堅持する最大の動力で、ありがとうございます!
----------------------桃花惜春風のCSDNブログから、全文アドレスをクリックしてください.https://blog.csdn.net/xiaoyu_BD/article/details/82261856?utm_source=copy
canal-kafkaの応用については前述した.canal-kafkaはkafkaをクライアントとしてcanalに埋め込み,canalに基づいてソースコードを修正し,特定の実装canalからkafkaへの伝送を達成する.
canal-kafkaはアリクラウドが最近更新した新しいインストールパッケージです.主な機能はcanalとkafkaのドッキングを実現し、大量のメッセージ伝送同期を実現することである.canal-kafkaでは、メッセージはByteStringで伝送され、ユーザーは構成によってkafkaの構成を指定するしかなく、ある程度の限界があるため、canalを使用してクライアントkafkaを定義すると、より柔軟性がありますが、より大きく維持されるので、実際の状況に応じてどのように選択するかが決まります.
参考:canalソース分析(一)クライアントプログラム
maven依存の構築
com.alibaba.otter
canal.client
1.0.25
org.apache.kafka
kafka-clients
1.1.0
注意バージョン対応
SimpleCanalClient
package com.unigroup.client.canal;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.util.List;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.Message;
import com.unigroup.core.canal.CanalToKG;
/**
* @Title: SimpleCanalClient.java
* @Package com.unigroup.canal
* @Description: canal
* @author
* @date 2018 8 29 11:56:09
* @version V1.0
*/
public class SimpleCanalClient {
private CanalConnector connector=null;
public SimpleCanalClient(String ip,String port,String instance) {
//
connector = CanalConnectors.newSingleConnector(new InetSocketAddress(ip, Integer.parseInt(port)),instance, "", "");
}
public List execute(int batchSize,Class> clazz ) throws InstantiationException, IllegalAccessException, NoSuchMethodException, SecurityException {
//int batchSize = 1;
int emptyCount = 0;
Object obj = clazz.newInstance();
Method method = clazz.getMethod("send",Message.class);
try {
connector.connect();
// connector.subscribe(".*\\..*");
connector.subscribe("test.test1");
connector.rollback();
int totalEmptyCount = 120;
while (emptyCount < totalEmptyCount) {
Message message = connector.getWithoutAck(batchSize); //
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
emptyCount++;
System.out.println("empty count : " + emptyCount);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
} else {
emptyCount = 0;
method.invoke(obj, message);
}
connector.ack(batchId); //
// connector.rollback(batchId); // ,
}
System.out.println("empty too many times, exit");
} catch (IllegalAccessException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IllegalArgumentException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InvocationTargetException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
connector.disconnect();
}
return null;
}
}
analKafkaProducer
package com.unigroup.kafka.producer;
import java.io.IOException;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.otter.canal.protocol.Message;
import com.unigroup.kafka.producer.KafkaProperties.Topic;
import com.unigroup.utils.MessageSerializer;
/**
* @Title: CanalKafkaProducer.java
* @Package com.unigroup.kafka.producer
* @Description:
* @author
* @date 2018 9 3 11:53:35
* @version V1.0
*/
public class CanalKafkaProducer {
private static final Logger logger = LoggerFactory.getLogger(CanalKafkaProducer.class);
private Producer producer;
public void init(KafkaProperties kafkaProperties) {
Properties properties = new Properties();
properties.put("bootstrap.servers", kafkaProperties.getServers());
properties.put("acks", "all");
properties.put("retries", kafkaProperties.getRetries());
properties.put("batch.size", kafkaProperties.getBatchSize());
properties.put("linger.ms", kafkaProperties.getLingerMs());
properties.put("buffer.memory", kafkaProperties.getBufferMemory());
properties.put("key.serializer", StringSerializer.class.getName());
properties.put("value.serializer", MessageSerializer.class.getName());
producer = new KafkaProducer(properties);
}
public void stop() {
try {
logger.info("## stop the kafka producer");
producer.close();
} catch (Throwable e) {
logger.warn("##something goes wrong when stopping kafka producer:", e);
} finally {
logger.info("## kafka producer is down.");
}
}
public void send(Topic topic, Message message) throws IOException {
ProducerRecord record;
if (topic.getPartition() != null) {
record = new ProducerRecord(topic.getTopic(), topic.getPartition(), null, message);
} else {
record = new ProducerRecord(topic.getTopic(), message);
}
producer.send(record);
if (logger.isDebugEnabled()) {
logger.debug("send message to kafka topic: {}
{}", topic.getTopic(), message.toString());
}
}
}
canalToKafkaServer
package com.unigroup.kafka.server;
import com.unigroup.client.canal.SimpleCanalClient;
import com.unigroup.kafka.producer.CanalKafkaProducer;
import com.unigroup.utils.GetProperties;
/**
* @Title: canal.java
* @Package com.unigroup.kafka.server
* @Description:
* @author
* @date 2018 9 3 11:23:35
* @version V1.0
*/
public class canalToKafkaServer {
public static void execute() {
SimpleCanalClient simpleCanalClient = new SimpleCanalClient(GetProperties.getValue("MYSQL_HOST"),
GetProperties.getValue("MTSQL_PORT"), GetProperties.getValue("INSTANCE"));
try {
simpleCanalClient.execute(1,CanalKafkaProducer.class);
} catch (Exception e) {
e.printStackTrace();
}
}
}
この簡単なcanalからkafkaまでのdemoはすでに完成しています.これらはテストコードにすぎず、実際のアプリケーションでは状況に応じて、自分でより多くの機能を開発することができます.
作者:桃花惜春风転載出典を明記してください、原文の住所:https://blog.csdn.net/xiaoyu_BD/article/details/82261856もし本文があなたに役に立つと感じたら、あなたの支持は私が書くことを堅持する最大の動力で、ありがとうございます!
----------------------桃花惜春風のCSDNブログから、全文アドレスをクリックしてください.https://blog.csdn.net/xiaoyu_BD/article/details/82261856?utm_source=copy