Javaマルチスレッド最適化方法と使い方


一、マルチスレッド紹介
プログラムでは、多くの業務システムで同時に処理する必要があるので、避けてはいけないのはマルチスレッドのプログラミング問題です。同時進行のシーンではマルチスレッドが非常に重要です。また、面接の時、面接官は通常、スレッドについて質問します。私達は通常このように答えます。主に二つの方法があります。第一に、Thread類を受け継ぎ、run方法を書き直します。第二種類:Runnableインターフェースを実現し、run方法を書き換える。面接官は必ずこの二つの方法の長所と短所はどこにあるかを聞いてみます。いずれにしても、一つの結論を出します。
この時、マルチスレッドの戻り値を得るにはどうすればいいですか?私たちが多く学んだ知識によって、Callableインターフェースを実現し、call方法を書き直したいと思います。マルチスレッドは実際のプロジェクトではどのように使いますか?彼はいくつの方法がありますか?
まず、例を見てみましょう。
 
これはマルチスレッドを作成する簡単な方法であり、例えば、異なるビジネスシーンによって、Thread()に異なるパラメータが入ってきて、異なる業務ロジックを実現することができます。しかし、この方法はマルチスレッドの暴落を作成する問題はスレッドを繰り返し作成し、スレッドを作成しても廃棄します。併発シーンに対して要求が低い場合は、このような方式もいいようですが、高合併のシーンでは、このような方式はだめです。スレッドを作成してスレッドを廃棄するのは非常に資源がかかります。だから、経験によって、正確なやり方はスレッド池技術を使って、JDKは複数のスレッド池タイプを提供しています。具体的にはjdkの文書を調べることができます。
 
ここのコードに注意したいのは、着信パラメータは私たちが構成するスレッドの数を表しています。多ければ多いほどいいですか?違います。私たちはスレッド数を設定する時にサーバの性能を十分に考慮しなければならないので、スレッド配置が多く、サーバの性能が優れているとは限りません。通常、マシンの計算はスレッド数で決まりますが、スレッド数がピークに達すると計算できなくなります。CPUを消費する業務ロジック(計算が多い)であれば、スレッド数は核数と同じでピークに到達します。I/Oを消費する業務ロジック(オペレーティングデータベース、ファイルのアップロード、ダウンロードなど)であれば、スレッド数が多いほど、性能を向上させる効果があります。
スレッド数の設定は、もう一つの数式で決定されます。
Y=N*((a+b)/a)では、N:CPUコア数、a:スレッド実行時のプログラムの計算時間、b:スレッド実行時のプログラムのブロック時間。この公式があると、スレッド池のスレッド数配置が制約されます。機械の実際の状況に応じて柔軟に配置できます。
二、マルチスレッド最適化と性能比較
最近のプロジェクトでは、スレッド技術を使用していますが、使用中に多くのトラブルが発生しました。熱のあるうちに、いくつかのマルチスレッドフレームの性能比較を整理してみてください。現在把握されているのは大体3種類で、第一はThreadPool(スレッド池)+Count DownLatch(プログラムカウンタ)、第二はFork/Joinフレーム、第三はJDK 8パラレルストリーム、以下これらの方法のマルチスレッド処理性能を比較してまとめます。
まず、業務シーンを想定して、メモリ内に複数のファイルオブジェクトを生成し、ここで暫定的に30000、(Thread.sleep)スレッド睡眠シミュレーション業務処理ロジックを用いて、これらの方法のマルチスレッド処理性能を比較する。
1)単スレッド
このような方法は非常に簡単ですが、プログラムは処理の過程で非常に時間がかかります。各スレッドは現在のスレッドの実行を待っているので、マルチスレッドとは関係がないので、効率が非常に低いです。
まずファイルオブジェクトを作成します。コードは以下の通りです。

public class FileInfo {
 private String fileName;//   
 private String fileType;//    
 private String fileSize;//    
 private String fileMD5;//MD5 
 private String fileVersionNO;//     
 public FileInfo() {
  super();
 }
 public FileInfo(String fileName, String fileType, String fileSize, String fileMD5, String fileVersionNO) {
  super();
  this.fileName = fileName;
  this.fileType = fileType;
  this.fileSize = fileSize;
  this.fileMD5 = fileMD5;
  this.fileVersionNO = fileVersionNO;
 }
 public String getFileName() {
  return fileName;
 }
 public void setFileName(String fileName) {
  this.fileName = fileName;
 }
 public String getFileType() {
  return fileType;
 }
 public void setFileType(String fileType) {
  this.fileType = fileType;
 }
 public String getFileSize() {
  return fileSize;
 }
 public void setFileSize(String fileSize) {
  this.fileSize = fileSize;
 }
 public String getFileMD5() {
  return fileMD5;
 }
 public void setFileMD5(String fileMD5) {
  this.fileMD5 = fileMD5;
 }
 public String getFileVersionNO() {
  return fileVersionNO;
 }
 public void setFileVersionNO(String fileVersionNO) {
  this.fileVersionNO = fileVersionNO;
 }
続いて、シミュレーション業務処理は、30000個のファイルオブジェクトを作成し、スレッド睡眠は1 ms、以前に設定した1000 msで、発見時間は長く、全体のEclipseカードが落ちたので、時間を1 msに変更します。

public class Test {
   private static List<FileInfo> fileList= new ArrayList<FileInfo>();
   public static void main(String[] args) throws InterruptedException {
     createFileInfo();
     long startTime=System.currentTimeMillis();
     for(FileInfo fi:fileList){
       Thread.sleep(1);
     }
     long endTime=System.currentTimeMillis();
     System.out.println("     :"+(endTime-startTime)+"ms");
   }
   private static void createFileInfo(){
     for(int i=0;i<30000;i++){
       fileList.add(new FileInfo("      ","jpg","101522","md5"+i,"1"));
     }
   }
}
テストの結果は以下の通りです。
 
30000個のファイルオブジェクトを生成するのに時間がかかり、1分近くかかり、効率が低いことが見られます。
2)ThreadPool(スレッド池)+CountDownLatch(プログラムカウンタ)
名前の通り、CountDownLatchはスレッドカウンタであり、彼の実行過程は以下の通りである。まず、メインスレッドでawait()メソッドを呼び出し、メインスレッドがブロックされた後、プログラムカウンタをパラメータとしてスレッドオブジェクトに伝達し、最後にスレッドごとにタスクを実行した後、countDown()メソッドでタスクを完了したことを示す。count Down()が複数回実行されると、メインスレッドのawait()が無効になります。実現過程は以下の通りです。

public class Test2 {
 private static ExecutorService executor=Executors.newFixedThreadPool(100);
 private static CountDownLatch countDownLatch=new CountDownLatch(100);
 private static List<FileInfo> fileList= new ArrayList<FileInfo>();
 private static List<List<FileInfo>> list=new ArrayList<>();
 public static void main(String[] args) throws InterruptedException {
  createFileInfo();
  addList();
  long startTime=System.currentTimeMillis();
  int i=0;
  for(List<FileInfo> fi:list){
   executor.submit(new FileRunnable(countDownLatch,fi,i));
   i++;
  }
  countDownLatch.await();
  long endTime=System.currentTimeMillis();
  executor.shutdown();
  System.out.println(i+"     :"+(endTime-startTime)+"ms");
 }
 private static void createFileInfo(){
  for(int i=0;i<30000;i++){
   fileList.add(new FileInfo("      ","jpg","101522","md5"+i,"1"));
  }
 }
 private static void addList(){
  for(int i=0;i<100;i++){
   list.add(fileList);
  }
 }
}
FileRunnable類:

/**
 *      
 * @author wangsj
 *
 * @param <T>
 */
public class FileRunnable<T> implements Runnable {
   private CountDownLatch countDownLatch;
   private List<T> list;
   private int i;
   public FileRunnable(CountDownLatch countDownLatch, List<T> list, int i) {
     super();
     this.countDownLatch = countDownLatch;
     this.list = list;
     this.i = i;
   }
   @Override
   public void run() {
     for(T t:list){
       try {
          Thread.sleep(1);
       } catch (InterruptedException e) {
          e.printStackTrace();
       }
       countDownLatch.countDown();
     }
   }
}
テストの結果は以下の通りです。
 
3)Fork/Joinフレーム
Jdkはバージョン7からFork/joinのフレームワークが出てきます。文字どおりに、forkは分割、joinは合併です。だから、このフレームの思想はです。forkでタスクを分割し、ジョインで分割した後、各人物の実行後の結果をまとめます。例えば、連続加算の数を計算します。2+4+5+7=私たちはFork/joinのフレームワークを使ってどのように完成しますか?分子を解体するという考えで、この演算を二つのサブタスクに分割できます。一つは2+4を計算し、もう一つは5+7を計算します。
Fork/Joinフレームワーク実行思想:まず、タスクを分割し、forkクラスを使用して大きなタスクをいくつかのサブタスクに分割します。この分割プロセスは、分割されたタスクが十分小さいまで、実際の状況に応じて決定される必要があります。その後、ジョインクラスはタスクを実行し、分割されたサブタスクは異なるキューにあり、いくつかのスレッドはそれぞれキューからタスクを取得して実行し、実行した結果は個々のキューに入れ、最後にスレッドを起動し、結果をキューから取り出して結果を統合します。
Fork/Joinフレームワークを使用するにはいくつかのクラスが必要であり、クラスの使用方法についてはJDKのAPIを参照することができ、このフレームを使用するには、まずForkJoinTaskクラスを継承する必要があり、通常は、彼のサブクラスRecursiveTaskまたはRecursive Actionを継承するだけでよく、Recursive Taskは、リターンの結果がないシーンに使用される。ForkJoinTaskの実行は、分割されたサブタスクを維持するために異なるタスクキューに追加されるForkJoinPoolを用いて実行される必要がある。
以下は実現コードです。

public class Test3 {
 private static List<FileInfo> fileList= new ArrayList<FileInfo>();
// private static ForkJoinPool forkJoinPool=new ForkJoinPool(100);
// private static Job<FileInfo> job=new Job<>(fileList.size()/100, fileList);
 public static void main(String[] args) {
  createFileInfo();
  long startTime=System.currentTimeMillis();
  ForkJoinPool forkJoinPool=new ForkJoinPool(100);
  //    
  Job<FileInfo> job=new Job<>(fileList.size()/100, fileList);
  //        
ForkJoinTask<Integer> fjtResult=forkJoinPool.submit(job);
//  
  while(!job.isDone()){
   System.out.println("    !");
  }
  long endTime=System.currentTimeMillis();
  System.out.println("fork/join    :"+(endTime-startTime)+"ms");
 }
 private static void createFileInfo(){
  for(int i=0;i<30000;i++){
   fileList.add(new FileInfo("      ","jpg","101522","md5"+i,"1"));
  }
 }
}
/**
 *      
 * @author wangsj
 *
 */
public class Job<T> extends RecursiveTask<Integer> {
 private static final long serialVersionUID = 1L;
 private int count;
 private List<T> jobList;
 public Job(int count, List<T> jobList) {
  super();
  this.count = count;
  this.jobList = jobList;
 }
 /**
  *     ,     Runnable   run  
  */
 @Override
 protected Integer compute() {
  //    
  if(jobList.size()<=count){
   executeJob();
   return jobList.size();
  }else{
   //      ,        
   List<RecursiveTask<Long>> fork = new LinkedList<RecursiveTask<Long>>();
   //     ,       
   int countJob=jobList.size()/2;
   List<T> leftList=jobList.subList(0, countJob);
   List<T> rightList=jobList.subList(countJob, jobList.size());
   //    
   Job leftJob=new Job<>(count,leftList);
   Job rightJob=new Job<>(count,rightList);
   //    
   leftJob.fork();
   rightJob.fork();
   return Integer.parseInt(leftJob.join().toString())
     +Integer.parseInt(rightJob.join().toString());
  }
 }
 /**
  *       
  */
 private void executeJob() {
  for(T job:jobList){
   try {
    Thread.sleep(1);
   } catch (InterruptedException e) {
    e.printStackTrace();
   }
  }
 }
テストの結果は以下の通りです。
 
4)JDK 8並列流
並行流はjdk 8の新しい特性の一つであり、一つの順序で実行される流れを一つの同時流に変えて、parallelを呼び出すことによって実現されると考えられています。並列ストリームは、1つのストリームを複数のデータブロックに分割し、異なるスレッドで異なるデータブロックのストリームを処理し、最後に各ブロックのデータストリームを合成する処理結果は、Fork/Joinフレームワークと同様である。
パラレルフローはデフォルトでは、パブリックスレッド池ForkJoinPoolを使用しています。彼のスレッド数は、マシンの核数に応じて、スレッド数のサイズを適切に調整することができます。スレッド数の調整は以下のように行われます。

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "100");
以下はコードの実現過程で、非常に簡単です。

public class Test4 {
private static List<FileInfo> fileList= new ArrayList<FileInfo>();
public static void main(String[] args) {
//    System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "100");
   createFileInfo();
   long startTime=System.currentTimeMillis();
   fileList.parallelStream().forEach(e ->{
     try {
        Thread.sleep(1);
     } catch (InterruptedException f) {
        f.printStackTrace();
     }
   });
   long endTime=System.currentTimeMillis();
   System.out.println("jdk8     :"+(endTime-startTime)+"ms");
}
private static void createFileInfo(){
   for(int i=0;i<30000;i++){
     fileList.add(new FileInfo("      ","jpg","101522","md5"+i,"1"));
   }
}
}
以下はテストです。最初はスレッドの数が設定されていません。標準設定を採用しています。テスト結果は以下の通りです。
 
結果は理想的ではなく、時間がかかります。次にスレッドの数を設定します。すなわち、次のコードを追加します。

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "100");
続いてテストを行います。結果は以下の通りです。
 
今回は時間が少なくて理想的です。
三、まとめ
以上のように、単スレッドを参考にして、最も長い時間がかかるのは元のFork/Joinフレームであり、ここではスレッドプールの数を配置しているが、効果は正確にスレッド池の数のJDK 8の並列ストリームを配置している。並列流はコードが分かりやすく、余分なforサイクルを書く必要がなく、一つのparallel Stream方法が全部できて、コード量が大幅に減少しました。実は、並列流の底はまだFork/Joinフレームを使っています。これは開発の過程で様々な技術を活用して、様々な技術の長所と短所を明確にして、より良いサービスを提供できるように要求します。