MysqlデータをElasticSearchクラスタにインポート
22276 ワード
一週間も博文を書いていないが、最近指導者が困難な任務を説明したので、他のことを考える暇がない.mysqlデータベースのデータをESクラスタに転送します.mysqlデータは、アリからデータを保存する場所を引いて、クラスタに保存します.ここまで言うと、読者はこれが何か難しいと思っているかもしれません.はい、データ量が少なく、数百万人か数千万人が確かに時間をかけて転送すればいいです.しかし、二十数億のデータ量であれば、どうすればいいのでしょうか.リーダーは私に1週間の時間を与えて、私がコードを書くことを含めて、デバッグのスピード、データを引き始めました.そのため、私はこの20億のデータに苦しめられています.
次は私がコードを書いて、私は半日を使ってコードを書いて、テストを行って、速度をデバッグして、配置を始めて、プログラムは走って、私はほっとしました.それから、私はそれを正式な環境に配置して、ESの中でindexを建てて、mysqlの中で1つのテストテーブルを建てて、数十万のデータを挿入して、mysqlのデータの中のデータをクラスタに導入し始めて、まあまあの様子です.この時一日はもう過ぎた.
それから私はそれを正式に操作し始めました.この時問題が来て、まず速度を捨てて言わないで、mysqlの中で1枚の表ごとに1億余りのデータ、データの中の未知のものが多すぎて、データのフォーマット、空、各種の奇抜な文字、特殊な文字など.私はこれまで思いもよらなかった異常処理を行い、コードを改善しました.そしてプログラムが通じるようになりました.mysqlからテーブルを探して、まず100万のデータを挿入して、試してみることにしました.問題はまた来て、mysqlの中から100万のデータを探しましたが、最終的にクラスタに挿入されたのは99万余りで、まだ何千ものデータがどこに行っているのか、私がいろいろな異常状況を考えても、こんなに多くのデータ量の損失があります.ボスが私に要求したのはデータ量の損失を百万分の1に抑えることですね.仕方なく、私は再びコードを改善して、私はもっと多くのlog情報を印刷して、クラスタに挿入した失敗したデータを記録しました.そしてこれらのデータを分析し、再びコードを改良し、今回は100万のデータをすべて挿入しました.私はやっとゆっくりした.
最後に、デバッグ速度です.マルチスレッドを採用し、mysqlからどのようにクエリーするか、クエリーされたデータはメモリに入れて直接クラスタに挿入するか、ファイルを書き込んだ後にファイルを読み込んでクラスタに挿入するかは、自分次第です.個人的には1つ目をお勧めします.もちろん、接続が切れたり、データ記録が失敗したり、ディスク負荷がかかったりするなど、ビッグデータ量の導入に注意しなければならない問題もたくさんあります.次は私の最初の概略コードを貼って、皆さんの参考にします.
次は私がコードを書いて、私は半日を使ってコードを書いて、テストを行って、速度をデバッグして、配置を始めて、プログラムは走って、私はほっとしました.それから、私はそれを正式な環境に配置して、ESの中でindexを建てて、mysqlの中で1つのテストテーブルを建てて、数十万のデータを挿入して、mysqlのデータの中のデータをクラスタに導入し始めて、まあまあの様子です.この時一日はもう過ぎた.
それから私はそれを正式に操作し始めました.この時問題が来て、まず速度を捨てて言わないで、mysqlの中で1枚の表ごとに1億余りのデータ、データの中の未知のものが多すぎて、データのフォーマット、空、各種の奇抜な文字、特殊な文字など.私はこれまで思いもよらなかった異常処理を行い、コードを改善しました.そしてプログラムが通じるようになりました.mysqlからテーブルを探して、まず100万のデータを挿入して、試してみることにしました.問題はまた来て、mysqlの中から100万のデータを探しましたが、最終的にクラスタに挿入されたのは99万余りで、まだ何千ものデータがどこに行っているのか、私がいろいろな異常状況を考えても、こんなに多くのデータ量の損失があります.ボスが私に要求したのはデータ量の損失を百万分の1に抑えることですね.仕方なく、私は再びコードを改善して、私はもっと多くのlog情報を印刷して、クラスタに挿入した失敗したデータを記録しました.そしてこれらのデータを分析し、再びコードを改良し、今回は100万のデータをすべて挿入しました.私はやっとゆっくりした.
最後に、デバッグ速度です.マルチスレッドを採用し、mysqlからどのようにクエリーするか、クエリーされたデータはメモリに入れて直接クラスタに挿入するか、ファイルを書き込んだ後にファイルを読み込んでクラスタに挿入するかは、自分次第です.個人的には1つ目をお勧めします.もちろん、接続が切れたり、データ記録が失敗したり、ディスク負荷がかかったりするなど、ビッグデータ量の導入に注意しなければならない問題もたくさんあります.次は私の最初の概略コードを貼って、皆さんの参考にします.
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategy;
import com.fasterxml.jackson.databind.SerializationFeature;
import org.elasticsearch.action.bulk.*;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import java.io.*;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.sql.*;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.Date;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Created by xxx on 2016/08/30.
*/
public class FileToEsOrderTest {
static ConcurrentLinkedQueue queues = new ConcurrentLinkedQueue();
static AtomicBoolean isInsert = new AtomicBoolean(true);
static TransportClient client = null;
public static void main(String[] agrs) throws Exception {
Settings settings = Settings.settingsBuilder()
.put("cluster.name", "elasticsearch-cluster").build();
client = TransportClient.builder().settings(settings).build();
try {
client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("xxxxx"), 9500));
} catch (UnknownHostException error) {
System.out.print(error.getMessage());
}
final long aa = System.currentTimeMillis();
final ConcurrentHashMap hashMap = new ConcurrentHashMap();
for (int i = 0; i < 20; i++) {
new Thread(new Runnable() {
Integer num = 1;
public void run() {
//Add transport addresses and do something with the client...
hashMap.put(Thread.currentThread().getName(), Boolean.FALSE);
final BulkProcessor bulkProcessor = BulkProcessor.builder(
client,
new BulkProcessor.Listener() {
//
public void afterBulk(long l, BulkRequest bulkRequest,
BulkResponse bulkResponse) {
System.out.println(" :" +
bulkRequest.numberOfActions());
if (bulkResponse.hasFailures()) {
for (BulkItemResponse item :
bulkResponse.getItems()) {
if (item.isFailed()) {
System.out.println(" :--------" +
item.getFailureMessage());
}
}
}
}
//
public void beforeBulk(long executionId,
BulkRequest request) {
}
//
public void afterBulk(long executionId,
BulkRequest request,
Throwable failure) {
System.out.println("happen fail = " +
failure.getMessage() + " ,
cause = " + failure.getCause());
}
})
.setBulkActions(10000)
.setBulkSize(new ByteSizeValue(100, ByteSizeUnit.MB))
.setBackoffPolicy(
BackoffPolicy.exponentialBackoff(
TimeValue.timeValueMillis(100), 3))
.setConcurrentRequests(1)
.build();
while (true) {
if (!queues.isEmpty()) {
try {
String json = queues.poll();
if (json == null) continue;
int index1 = json.indexOf("checksum");
int index2 = json.indexOf("}", index1);
index1 += 10;
String id = json.substring(index1 + 1, index2 - 1);
int index3 = json.indexOf("dp_id");
int index4 = json.indexOf(",", index3);
index3 += 7;
String routing = json.substring(index3 + 1, index4 - 1);
count++;
bulkProcessor.add(new IndexRequest("xxxx",
"xxxxx").id(id).routing(routing).source(json));
} catch (Exception e) {
System.out.print(e.getMessage());
}
}
if (queues.isEmpty() && !isInsert.get()) {
bulkProcessor.flush();
long jjj = System.currentTimeMillis() - aa;
System.out.print(" " + Thread.currentThread().getName()
+ ":" + jjj + " ");
hashMap.put(Thread.currentThread().getName(), Boolean.TRUE);
while (hashMap.values().contains(Boolean.FALSE)) {
try {
Thread.currentThread().sleep(1 * 1000);
} catch (Exception e) {
e.printStackTrace(System.out);
}
}
bulkProcessor.close();
break;
}
}
}
}).start();
}
// File file = new File("/test/rfm/rfm_data.txt");
// FileOutputStream fileOutputStream = new FileOutputStream((file));
// OutputStreamWriter outputStreamWriter =
// new OutputStreamWriter(fileOutputStream);
// bufferedWriter = new BufferedWriter(outputStreamWriter);
for(int i = 2; i <= 23; i++){
WriteData("xxx" + i);
}
// WriteData("rfm_1");
// bufferedWriter.close();
// outputStreamWriter.close();
// fileOutputStream.close();
System.out.println(" ");
}
//
public static void WriteData(String tableName) throws IOException {
Connection conn = null;
PreparedStatement ps = null;
ResultSet rs = null;
Integer count = 1;
List columnName = Arrays.asList("trade_last_interval","trade_first_interval");
List columnDateName = Arrays.asList("modify","trade_first_time","trade_last_time");
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
try {
Class.forName("com.mysql.jdbc.Driver");
String url = "jdbc:mysql://xxxxxxxxx";
conn = DriverManager.getConnection(url, "xxxx", "xxxx");
System.out.println(" , MySQL:" + tableName);
String sql = "select * from " + tableName;
ps = conn.prepareStatement(sql,
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
ps.setFetchSize(Integer.MIN_VALUE);
rs = ps.executeQuery();
ResultSetMetaData rsmd = rs.getMetaData();
int colCount = rsmd.getColumnCount();
ObjectMapper objectMapper = new ObjectMapper()
.setSerializationInclusion(JsonInclude.Include.NON_NULL)
.setDateFormat(new SimpleDateFormat("yyyy-MM-dd"))
.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS)
.setPropertyNamingStrategy(
PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES);
while(rs.next()) { //while
Map map = new LinkedHashMap<>();
//StringBuilder buffer = new StringBuilder();
for(int i = 1; i <= colCount; i++ ) {
String name = rsmd.getColumnName(i);
if(!columnName.contains(name)) {
String value = rs.getString(i);
boolean flag = true;
if(columnDateName.contains(name)){
try {
dateFormat.parse(value);
} catch (Exception e){
flag = false;
}
} else if("buyer_nick".equalsIgnoreCase(name)){
value = encrypt(value);
}
if (flag && value != null && !"".equals(value.trim()) && value.trim().length() > 0) {
//buffer.append("\"" + name + "\":\"" + value + "\"");
//buffer.append(",");
map.put(name, value);
}
}
}
count++;
if(map != null && map.size() > 0){
queues.add(objectMapper.writeValueAsString(map));
}
if(count % 200000 == 0){
int number = queues.size();
int jj = number/200000;
System.out.println("index: " + count + ",
jj: " + jj + ", number: " + number);
while(jj > 0){
try {
Thread.sleep(2000*jj);
} catch (InterruptedException e) {
e.printStackTrace();
}
int number2 = queues.size();
jj = number2 / 200000;
System.out.println("index2: " + count + ",
jj: " + jj + ", number2: " + number2);
}
}
}
isInsert = new AtomicBoolean(false);
} catch (ClassNotFoundException e) {
e.printStackTrace();
}catch (SQLException e) {
e.printStackTrace();
} finally {
try {
if(rs != null) {
rs.close();
}
} catch (SQLException e) {
e.printStackTrace();
}
try {
if(ps != null) {
ps.close();
}
} catch (SQLException e) {
e.printStackTrace();
}
try {
if(conn != null) {
conn.close();
}
} catch (SQLException e) {
e.printStackTrace();
}
}
System.out.println(tableName + " , :" + count);
try {
Thread.sleep(100000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}