forkjoin-分岐マージフレームワーク


前言
マルチプロセッサ時代の到来に伴い、マルチプロセッサ並列計算をサポートするコンピュータが増えている.言語レベルでもサポートされており、fork/joinはjava言語がマルチプロセッサ計算を利用する実現フレームワークである.詳細については、公式ドキュメントを参照してください.
Fork/Join
fork/joinフレームワークはExecutorServiceインタフェースの実装であり、マルチプロセッサの利点を利用するのに役立ちます.これは、より小さな部分に再帰的に分解できる作業のために設計されています.目的は、使用可能なすべての処理能力を使用してアプリケーションのパフォーマンスを向上させることです.
任意のExecutorService実装と同様に、fork/joinフレームワークは、スレッドプール内の作業スレッドにタスクを割り当てます.fork/joinフレームワークは、仕事を盗むアルゴリズムを使用しているため、ユニークです.ワークスレッドはやるべきことを使い果たし、他の忙しいスレッドからタスクを盗むことができます.
fork/joinフレームワークの中心はForkJoinPoolクラスであり、AbstractExecutorServiceクラスの拡張である.ForkJoinPoolはコアの作業盗難アルゴリズムを実現し、ForkJoinTaskプロセスを実行することができる.
きほんしよう
fork/joinフレームワークを使用する最初のステップは、一部の作業を実行するコードを記述することです.あなたのコードは次の偽コードに似ているはずです.
if (my portion of the work is small enough)
  do the work directly
else
  split my work into two pieces
  invoke the two pieces and wait for the results

このコードは、ForkJoinTaskサブクラスにパッケージされ、通常、より特殊なタイプの1つ、RecursiveTask(結果を返すことができる)、またはRecursiveActionが使用される.ForkJoinTaskサブクラスが準備された後、完了するすべての作業を示すオブジェクトを作成し、ForkJoinPoolインスタンスに渡すinvoke()メソッド.
ぼやけた画像
fork/joinフレームワークがどのように動作するかを理解するには、次の例を考慮してください.画像をぼやけたいとします.元のソース画像は整数配列で表され、各整数は単一の画素の色値を含む.ぼかしたターゲット画像もソース画像と同じ大きさの整数配列で表される.
ブラーを実行するには、ソースアレイを介して1ピクセルずつ動作します.各ピクセルは、その周辺のピクセル(赤、緑、青のコンポーネントが平均)と平均され、ターゲット配列に結果が配置されます.画像は大きな配列であるため、このプロセスには時間がかかる可能性があります.このアルゴリズムはfork/joinフレームワークを使用して実現することで、マルチプロセッサシステム上の同時処理を利用することができます.ここには可能な実装があります.
public class ForkBlur extends RecursiveAction {
    private int[] mSource;
    private int mStart;
    private int mLength;
    private int[] mDestination;
  
    // Processing window size; should be odd.
    private int mBlurWidth = 15;
  
    public ForkBlur(int[] src, int start, int length, int[] dst) {
        mSource = src;
        mStart = start;
        mLength = length;
        mDestination = dst;
    }

    protected void computeDirectly() {
        int sidePixels = (mBlurWidth - 1) / 2;
        for (int index = mStart; index < mStart + mLength; index++) {
            // Calculate average.
            float rt = 0, gt = 0, bt = 0;
            for (int mi = -sidePixels; mi <= sidePixels; mi++) {
                int mindex = Math.min(Math.max(mi + index, 0),
                                    mSource.length - 1);
                int pixel = mSource[mindex];
                rt += (float)((pixel & 0x00ff0000) >> 16)
                      / mBlurWidth;
                gt += (float)((pixel & 0x0000ff00) >>  8)
                      / mBlurWidth;
                bt += (float)((pixel & 0x000000ff) >>  0)
                      / mBlurWidth;
            }
          
            // Reassemble destination pixel.
            int dpixel = (0xff000000     ) |
                   (((int)rt) << 16) |
                   (((int)gt) <<  8) |
                   (((int)bt) <<  0);
            mDestination[index] = dpixel;
        }
    }
  
  ...

抽象的なcompute()メソッドが実装され、ファジイを直接実行したり、より小さなタスクに分割したりすることができます.単純な配列長しきい値は、作業を実行するか分割するかを決定するのに役立ちます.
protected static int sThreshold = 100000;

protected void compute() {
    if (mLength < sThreshold) {
        computeDirectly();
        return;
    }
    
    int split = mLength / 2;
    
    invokeAll(new ForkBlur(mSource, mStart, split, mDestination),
              new ForkBlur(mSource, mStart + split, mLength - split,
                           mDestination));
}

前述の方法がRecursiveActionクラスのサブクラスにある場合、ForkJoinPoolで実行されるタスクを設定するのは簡単であり、以下のステップを含む.
  • は、完了するすべての作業を示すタスクを作成します.
  • // source image pixels are in src
    // destination image pixels are in dst
    ForkBlur fb = new ForkBlur(src, 0, src.length, dst);
    
  • タスクを実行するForkJoinPoolを作成します.
  • ForkJoinPool pool = new ForkJoinPool();
    
  • タスクを実行します.
  • pool.invoke(fb);
    

    ターゲット画像ファイルを作成する追加のコードを含む完全なソースコードについては、ForkBlurの例を参照してください.
    標準実装
    fork/joinフレームワークを使用して、マルチプロセッサシステム上で同時実行するタスクのカスタムアルゴリズム(前節のForkBlur.javaの例)を実装するほか、Java SEでは、fork/joinフレームワークを使用して実装されている一般的な有用な特性もいくつかあります.Java SE 8に導入されたこのようなインプリメンテーションの1つは、java.util.Arraysクラスによって使用されるそのparallelSort()メソッドである.これらの方法はsort()と類似しているがfork/joinフレームワークにより同時性を利用する.マルチプロセッサシステムで実行すると、大きな配列の並列ソートは順序ソートよりも速くなります.しかし、これらの方法でfork/joinフレームワークをどのように使用するかはJavaチュートリアルの範囲を超えています.これらの情報については、Java APIドキュメントを参照してください.
    fork/joinフレームワークのもう一つの実装は、Java SE 8のリリースで予定されているLambdaプロジェクトの一部であるjava.util.streamsパッケージの方法によって使用される.詳細については、Lambda式セクションを参照してください.ForkJoinTaskの方法
    fork/joinフレームワークの難点はタスクの定義にある.ForkJoinTaskの方法を詳しく見てみましょう.
    サブクラス実装に使用される3つの虚メソッド.
    public abstract V getRawResult();
    protected abstract void setRawResult(V value);
    protected abstract boolean exec();
    

    フォーク:非同期でタスクを実行する
    public final ForkJoinTask<V> fork()
    

    連結:非同期実行タスクの戻り値の取得
    public final V join()
    

    実行:タスクを直接実行し、戻り値を取得します.
    public final V invoke()
    

    RecursiveAction
    戻り値のない再帰、例えば以下の【印刷】の例:
      public static class Print extends RecursiveAction {
        private final List<Integer> list;
    
        public Print(List<Integer> list) {
          this.list = list;
        }
    
        @Override
        protected void compute() {
          //       ,    ,        
          if (list.size() == 1) {
            try {
              Thread.sleep(5000);//      
            } catch (InterruptedException e) {
              e.printStackTrace();
            }
            System.err.println(Thread.currentThread().getName() + "|" + list.get(0));
            return;
          }
          //      ,
          int middle = list.size() / 2;
          List<Integer> head = Take.take(list, 0, middle);
          List<Integer> tail = Take.take(list, middle, list.size());
          Print one = new Print(head);
          Print two = new Print(tail);
          two.fork();//  
          one.invoke();//      
          two.join();//  
        }
      }
    

    RecursiveTask
    戻り値の再帰があります.たとえば、次の「フィボナッチ数」の例です.
      public static class Fibonacci extends RecursiveTask<BigDecimal> {
        private final int n;
    
        Fibonacci(int n) {
          this.n = n;
        }
    
        @Override
        protected BigDecimal compute() {
          System.err.println(Thread.currentThread().getName() + "=" + n);
    	  //     ,    ,       
          if (n <= 2)	//         1
            return BigDecimal.ONE;
          Fibonacci f1 = new Fibonacci(n - 1);
          Fibonacci f2 = new Fibonacci(n - 2);
          f2.fork();//  
          return f1.invoke().add(f2.join());//                      
        }
      }
    

    実践:mavenの上位20倉庫を取得
    maven centerに登録されている最初の20の倉庫アドレスは、次のURLで取得できます.
  • https://mvnrepository.com/repos?p=1
  • https://mvnrepository.com/repos?p=2

  • 1つのサイトで10個取得できます.順番に取得する方法を採用すると、2倍の時間がかかります.fork/joinフレームワークを採用することで、並列に取得でき、時間を大幅に節約できます.
    次に、1ページのデータを取得する方法を示します.
    public static List<Map<String, Object>> repository(int page) {
      return Try.ofCallable(() -> {
        Document document = Jsoup.connect("https://mvnrepository.com/repos?p=" + page).get();
        Element body = document.body();
        Elements orders = body.select(".im-title span");
        Elements titles = body.select(".im-title a");
        Elements urls = body.select(".im-subtitle");
        List<Map<String, Object>> list = new ArrayList<>();
        Do.loop(t -> {
          Map<String, Object> map = new HashMap<>();
          String order = orders.get(t).text();
          map.put("order", Integer.parseInt(order.substring(0, order.indexOf("."))));
          map.put("name", titles.get(t).text());
          map.put("url", urls.get(t).text());
          list.add(map);
        }, titles.size());
        return list;
      }).getOrElse(new ArrayList<>());
    }
    

    次に、分岐マージタスクを行います.
    public static class Task extends RecursiveTask<List<Map<String, Object>>> {
    
      private final List<Integer> pages;
    
      public Task(List<Integer> pages) {
        this.pages = pages;
      }
    
      @Override
      protected List<Map<String, Object>> compute() {
      	//       ,    ,        
        if (pages.size() == 1) {
          return repository(pages.get(0));
        }
        int middle = pages.size() / 2;
        Task task1 = new Task(pages.subList(0, middle));
        Task task2 = new Task(pages.subList(middle, pages.size()));
        List<Map<String, Object>> res = new ArrayList<>();
        task2.fork();	//  2  (  )
        res.addAll(task1.invoke());		//    1       
        res.addAll(task2.join());	//    2     (            )
        return res;
      }
    }
    

    最後にForkJoinPoolが実行します.
      public static void main(String[] args) {
        ForkJoinPool pool = new ForkJoinPool();
        List<Map<String, Object>> res = pool.invoke(new Task(Arrays.asList(1, 2)));
        System.err.println(res);
      }
    

    結果は次のとおりです.
    [{name=Central, url=https://repo1.maven.org/maven2/, order=1}, {name=Sonatype, url=https://oss.sonatype.org/content/repositories/releases/, order=2}, {name=Spring Plugins, url=https://repo.spring.io/plugins-release/, order=3}, {name=Spring Lib M, url=https://repo.spring.io/libs-milestone/, order=4}, {name=Hortonworks, url=https://repo.hortonworks.com/content/repositories/releases/, order=5}, {name=Atlassian, url=https://maven.atlassian.com/content/repositories/atlassian-public/, order=6}, {name=JCenter, url=https://jcenter.bintray.com/, order=7}, {name=JBossEA, url=https://repository.jboss.org/nexus/content/repositories/ea/, order=8}, {name=JBoss Releases, url=https://repository.jboss.org/nexus/content/repositories/releases/, order=9}, {name=WSO2 Releases, url=https://maven.wso2.org/nexus/content/repositories/releases/, order=10}, {name=Spring Lib Release, url=https://repo.spring.io/libs-release/, order=11}, {name=WSO2 Public, url=https://maven.wso2.org/nexus/content/repositories/public/, order=12}, {name=IBiblio, url=https://maven.ibiblio.org/maven2/, order=13}, {name=XWiki Releases, url=https://maven.xwiki.org/releases/, order=14}, {name=Kotlin Dev, url=https://dl.bintray.com/kotlin/kotlin-dev/, order=15}, {name=Nuxeo, url=https://maven-eu.nuxeo.org/nexus/content/repositories/public-releases/, order=16}, {name=Clojars, url=https://clojars.org/repo/, order=17}, {name=Gradle Plugins, url=https://plugins.gradle.org/m2/, order=18}, {name=Geomajas, url=http://maven.geomajas.org/, order=19}, {name=Redhat GA, url=https://maven.repository.redhat.com/ga/, order=20}]
    

    これらの倉庫をmavenのsettings.xmlに入れることができ、jarが見つからない心配はありません.