Javaシリアルプログラム並列化実行

9549 ワード

プログラム開発過程においては、まずmethod 1を実行して結果result 1を得た後、method 2を実行して結果result 2を得た後、result 1とresult 2の結果に従ってプログラムの次の実行を判定する場合が多い.ここで、method 1とmethod 2は互いに関連していない.すなわち、method 1の実行とmethod 2の実行位置は、プログラムの実行結果に影響を与えることなく調整することができる.
1、シリアルプログラム
        従来の意味での書き方では、シリアル実行のプログラム形態が得られることが多く、プログラムの総実行時間はmethod 1の実行時間time 1にmethod 2の実行時間time 2を加えることで、総実行時間time=time 1+time 2となる.シリアルのプログラム形態を得た.
import com.yang.domain.BaseResult;
import org.junit.Test;

import java.util.concurrent.TimeUnit;

/**
 * @Description:
 * @Author: yangzl2008
 * @Date: 2016/1/9 19:06
 */
public class Serial {

    @Test
    public void test() {

        long start = System.currentTimeMillis();

        BaseResult baseResult1 = method1();//     1,   time1
        BaseResult baseResult2 = method2();//     2,   time2

        long end = System.currentTimeMillis();

        //    time = time1 + time2
        System.out.println("baseResult1 is " + baseResult1 + "
baseResult2 is " + baseResult2 + "
time cost is " + (end - start)); } private BaseResult method1() { BaseResult baseResult = new BaseResult(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } baseResult.setCode(1); baseResult.setMsg("method1"); return baseResult; } private BaseResult method2() { BaseResult baseResult = new BaseResult(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } baseResult.setCode(1); baseResult.setMsg("method2"); return baseResult; } }
実行結果:
baseResult1 is BaseResult{code=1, msg='method1'}
baseResult2 is BaseResult{code=1, msg='method2'}
time cost is 2000

2、シリアルプログラムのマルチスレッド移行
          このコードは最適化できる場所ではないでしょうか.プログラムの実行効率を速め、プログラムの実行時間を低減する.ここでmethod 1とmethod 2は相互に関連していません.すなわち、method 1の実行とmethod 2の実行位置はプログラムの実行結果に影響を与えることなく調整できます.作成スレッド1のためにmethod 1を実行し、スレッド2を作成してmethod 2を実行することはできませんか.method 1とmethod 2は結果を得る必要があるので、Callableインタフェースを使用してFuture.get()を使用する必要があります.実行結果が得られるが、実際にはFuture.get()は、プログラムが結果を返す前にブロックされている.すなわち、スレッド1がmethod 1方式を実行すると、プログラムがFuture.get()を呼び出したため、method 1が結果result 1を返すまでここで待機し、スレッド2がmethod 2を実行することができ、同様に、Future.get()もmethod 2の結果result 2が戻るまで待機する.スレッド1をオンにすると、スレッド2をオンにするとmethod 1,method 2を同時実行する効果は得られず、逆にプログラムがスレッドをオンにすることでプログラムの実行時間を多く消費し、プログラムの実行時間time=time 1+time 2+time(スレッドオン時間)となる.そこでシリアルプログラムの遷移状態を得た.
import com.yang.domain.BaseResult;
import org.junit.Test;

import java.lang.reflect.Method;
import java.util.concurrent.*;

/**
 * @Description:
 * @Author: yangzl2008
 * @Date: 2016/1/9 19:13
 */
public class SerialCallable {

    @Test
    public void test01() throws Exception {
        //         
        ExecutorService executorService = Executors.newFixedThreadPool(2);

        long start = System.currentTimeMillis();

        //       
        Future future1 = executorService.submit(new Task(this, "method1", null));
        //   ,      。  time1
        BaseResult baseResult1 = future1.get();
        //       
        Future future2 = executorService.submit(new Task(this, "method1", null));
        //   ,      。  time2
        BaseResult baseResult2 = future2.get();

        long end = System.currentTimeMillis();

        //     time = time1 + time2 + time(      )
        System.out.println("baseResult1 is " + baseResult1 + "
baseResult2 is " + baseResult2 + "
time cost is " + (end - start)); } public BaseResult method1() { BaseResult baseResult = new BaseResult(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } baseResult.setCode(1); baseResult.setMsg("method1"); return baseResult; } public BaseResult method2() { BaseResult baseResult = new BaseResult(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } baseResult.setCode(1); baseResult.setMsg("method2"); return baseResult; } class Task implements Callable { private Object object; private Object[] args; private String methodName; public Task(Object object, String methodName, Object[] args) { this.object = object; this.args = args; this.methodName = methodName; } @Override public T call() throws Exception { Method method = object.getClass().getMethod(methodName); return (T) method.invoke(object, args); } } }
実行結果:
baseResult1 is BaseResult{code=1, msg='method1'}
baseResult2 is BaseResult{code=1, msg='method1'}
time cost is 2001

3、並列プログラム
        method 1を実行するときに同時にmethod 2を実行し、結果を得て処理する方法はありますか?問題の出典に戻り、プログラムはまずmethod 1を実行して結果result 1を得た後、method 2を実行して結果result 2を得た後、result 1とresult 2の結果に従ってプログラムの次の実行を判定し、最終的に私たちが得た結果はresult 1とresult 2であり、その後、次の操作を行うと、result 1とresult 2を得たとき、method 1とmethod 2は実際には同時実行できます.つまり、私はまずmethod 1を実行してからmothod 2を実行します.私は彼らの戻り結果にかかわらず、result 1とresult 2を操作するときだけ、プログラムはFuture.get()メソッドを呼び出します(このメソッドは結果が戻るまで待っています).これは
遅延ロードの考え方、
Hibernateの属性の遅延ロードと一致しています.つまり、属性Aに対しては、普段は私が使わないときは割り当てられません.私が使っているときだけ、SQLクエリーで割り当てられます.そこで,同時実行のプログラム形態を得た.
        HibernateもCGLIBを用いて遅延ロードを実現しているので,CGLIBの遅延ロードクラスを用いてシリアルのプログラムを並列化することも考えられる.
import com.yang.domain.BaseResult;
import net.sf.cglib.proxy.Enhancer;
import net.sf.cglib.proxy.LazyLoader;
import org.junit.Test;

import java.lang.reflect.Method;
import java.util.concurrent.*;

/**
 * @Description:
 * @Author: yangzl2008
 * @Date: 2016/1/9 19:39
 */
public class Parallel {

    @Test
    public void test02() throws Exception {
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        long start = System.currentTimeMillis();

        //       
        Future future1 = executorService.submit(new Task(this, "method1", null));
        //    ,    ,baseResult1 cglib    ,      ,                 。
        BaseResult baseResult1 = futureGetProxy(future1, BaseResult.class);
        //       
        Future future2 = executorService.submit(new Task(this, "method2", null));
        //    ,    ,baseResult1 cglib    ,      ,                 。
        BaseResult baseResult2 = futureGetProxy(future2, BaseResult.class);
        //      baseResult1 baseResult2
        System.out.println("baseResult1 is " + baseResult1 + "
baseResult2 is " + baseResult2); long end = System.currentTimeMillis(); // time = max(time1,time2) System.out.println("time cost is " + (end - start)); } private T futureGetProxy(Future future, Class clazz) { Enhancer enhancer = new Enhancer(); enhancer.setSuperclass(clazz); return (T) enhancer.create(clazz, new FutureLazyLoader(future)); } /** * * @param */ class FutureLazyLoader implements LazyLoader { private Future future; public FutureLazyLoader(Future future) { this.future = future; } @Override public Object loadObject() throws Exception { return future.get(); } } public BaseResult method1() { BaseResult baseResult = new BaseResult(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } baseResult.setCode(1); baseResult.setMsg("method1"); return baseResult; } public BaseResult method2() { BaseResult baseResult = new BaseResult(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } baseResult.setCode(1); baseResult.setMsg("method2"); return baseResult; } class Task implements Callable { private Object object; private Object[] args; private String methodName; public Task(Object object, String methodName, Object[] args) { this.object = object; this.args = args; this.methodName = methodName; } @Override public T call() throws Exception { Method method = object.getClass().getMethod(methodName); return (T) method.invoke(object, args); } } }
実行結果:
baseResult1 is BaseResult{code=1, msg='method1'}
baseResult2 is BaseResult{code=1, msg='method2'}
time cost is 1057

4、考える
        以上、method 1,method 2をそれぞれ新しいスレッドで実行し、最終的に結果が得られた後、実行を継続する実装は様々であり、J.U.CにおけるCountDownLatchとCyclicBarrierはこのような考え方を実現できる典型的なツールクラスである.
本稿では,別の実装案を考案しただけであるが,既に成形されたプログラム,すなわちプログラムコードを変更しない場合には,AOP,CGLIBなどを用いて既存のプログラムの並列化を実現し,より容易に実現できると考える.SpringのAOPインプリメンテーション方式を用いて,戻り値をCGLIBで遅延ロードしながら,新しい線Callable行程を開いて被エージェントの方法を実行する.
5、参考
実戦CGLibシリーズのproxy編(三):遅延ロードLazyLoader
実戦cglib
AOP-CGLIBを実現
プログラムソースダウンロード