JAvaマルチスレッド同時処理大ファイル

6168 ワード

カードを打つことを堅持します!
この主な実装は、マルチスレッドが大きなファイルを処理することであり、ここでの大きなファイルは数十Mのファイルを指しています.例えば、私が下に書いた数百万個のデータを処理し、彼らをフィルタリングし、欲しいデータを得て指定したファイルに出力します.
最初は多くの回り道をして、私は今私の主な実現の構想を話して(ここでも多くの大物たちの意見を参考にしました)、自分が書いたときこそ自分のものです.
主な考え方:
1スレッドプールを作成し、複数のスレッドを作成します.同じファイルを同時に読み込む
2,ここでスレッドの実装はCallableインタフェースを実装することによってcall()を書き換えることである.
3,マルチスレッドを実現する以上,同時実行.だから、各スレッドは自分の操作を実行する責任を負わなければなりません.私は前に欠点がありました.私は多くのスレッドを読んでいましたが、これらのスレッドはすべて1つの読み取りの対象を共有しています.つまり、他のスレッドが処理されてから自分の番になるのを待つことです.これはシリアルです.単一のスレッドで実行するほうがいいです.方法を調べたところ、読み込むファイルをバイト単位で分割し、ブロック(領域)する方法がある.すなわち,複数のスレッドがソースファイルを複数のブロック(領域)に分割し,各スレッドは自分の担当するブロックのみを操作し,互いに影響を及ぼさない.
4,RandomAccessFileこれが上で実現したコアクラスです.よく調べてみてください.私はまだ深く理解していません.次のところが正しくなければ、大物たちにもっと指摘してください.
5,ブロック分割は一つの問題に関連し,分割するときである.カットポイントは1行のデータの真ん中にカットされていますが、どうすればいいですか?
雨が降ったら
家に帰って服を片付けた.
 
この開始位置と末尾位置の文字を改行文字かどうかを判断します:r,r,
そうでなければ、ロー全体を完全に出力できるように、ローの開始位置を見つける再帰を実現します.
6,結果を指定ファイルに出力する場合,RandomAccessFileのwriteメソッドを用いることができる.write()メソッドは、呼び出しが終了した後にファイルポインタを自動的に移動するので、次のデータを書き込む場所に頻繁にポインタを移動する必要はありません.従って、最終的に得られた処理後の結果データは無秩序である.
(私もテストしたことがありますが、スレッドごとに1つの一時ファイルに出力し、最終的に1つの総ファイルに統合し、一時ファイルを削除します.このような結果は秩序がありますが、少し蛇足しているような気がします.皆さんはもっと良い実現方法があるかどうか分かりません)
参照コード:
BufferIO.class
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @author SanYi&&HLing
 * 
 *           :   ---       --- txt    ---  ,       txt   
 * */
public class BufferIO {
	/**
	 *         4
	 * */
	public static final int CORE_POOL_SIZE = 4;
	/**
	 *         4
	 * */
	public static final int MAX_MUM_POOL_SIZE = 4;
	/**
	 *         
	 * */
	public static final int KEEP_ALIVE_TIME = 1;

	/**
	 * main    
	 * */
	public static void main(String[] args) throws IOException,
			InterruptedException {
		//       
		long start = System.currentTimeMillis();
		//   RandomAccessFile     
		RandomAccessFile fileWirter = null;
		//        
		ThreadPoolExecutor executor = null;

		try {
			//        
			executor = new ThreadPoolExecutor(CORE_POOL_SIZE,
					MAX_MUM_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.MINUTES,
					new ArrayBlockingQueue(1));
			//    tasks
			List tasks = new ArrayList<>();
			//        
			fileWirter = new RandomAccessFile("result.txt", "rw");
			//        
			for (int i = 0; i < CORE_POOL_SIZE; i++) {
				tasks.add(new CopyFile("data.txt", CORE_POOL_SIZE, i + 1,
						fileWirter));
			}
			//          
			List> futures = executor.invokeAll(tasks);
			//       ,         ,              
			for (Future future : futures) {
				future.get();
			}
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			//   
			if (fileWirter != null) {
				fileWirter.close();
			}
			if (executor != null) {
				executor.shutdown();
			}
		}
		//       
		long end = System.currentTimeMillis();
		System.out.println("   " + (end - start) + "ms");
	}

}

CopyFile.class
import java.io.RandomAccessFile;
import java.util.concurrent.Callable;

/**
 * @author SanYi&&HLing 
 * 	    Callable  ,   call        
 * */
public class CopyFile implements Callable {

	//    
	private String fileName;
	private int threadSize = 1;
	private int currentBlock = 1;
	private RandomAccessFile fileWirter;

	/**
	 *       
	 * 
	 * @param fileName
	 *               
	 * @param threadSize
	 *               
	 * @param currentBlock
	 *               
	 * @param fileWirter
	 *               
	 * */
	public CopyFile(String fileName, int threadSize, int currentBlock,
			RandomAccessFile fileWirter) {
		this.fileName = fileName;
		this.threadSize = threadSize;
		this.currentBlock = currentBlock;
		this.fileWirter = fileWirter;
	}

	/**
	 *     call()
	 * */
	@Override
	public Object call() throws Exception {
		//   
		RandomAccessFile fileAccess = null;
		try {

			//      currentPosition,      nextPosition

			//         
			fileAccess = new RandomAccessFile(fileName, "r");
			//          
			long len = fileAccess.length();
			//            ,              。       
			long step = len / threadSize;
			//         ,          
			long currentPosition = step * (currentBlock - 1);
			//           
			long nextPosition = currentPosition + step;

			//      ,                 ,          
			//        ,               ,            
			//           ,       
			//       :\r
, \r,
// "\r
" // , , \r

, , fileAccess.seek(nextPosition); char lastChar = (char) fileAccess.read(); if (lastChar == '
') { nextPosition += 1; } // fileAccess.seek(currentPosition); if (currentPosition > 0) { currentPosition += 1; long offset = 1; // fileAccess.seek(currentPosition - offset); // int beforeChar = fileAccess.read(); // \r,
while (beforeChar != '
' && beforeChar != '\r') { offset += 1; fileAccess.seek(currentPosition - offset); beforeChar = fileAccess.read(); } } // String lineValue = ""; long filePointer; // while ((lineValue = fileAccess.readLine()) != null) { // filePointer = fileAccess.getFilePointer(); // if (filePointer != len && filePointer > nextPosition) { break; } else if (!lineValue.isEmpty()) { // if (!lineValue.endsWith("@live.com")) { // fileWirter.write((lineValue + "
").getBytes()); } } } } catch (Exception e) { e.printStackTrace(); } finally { // if (fileAccess != null) { fileAccess.close(); } } return null; } }

時間がちょっと急いでいるので、よく書けないところがあるかもしれません.この文章があなたに役に立つと思ったら、いいねと励ましてほしいです.読んでくれてありがとう~