C#パラレルプログラミングのBarrierの使用-バリア同期による同時タスク


基本情報
Barrierは.Netが提供する常に同時化されたメカニズムであり、複数のタスクが異なる段階の同時化作業を同期させることができます.
ここでのポイントは「複数のタスク」と「異なる段階」です.4つの同じタスク(Task)があると仮定し、各タスクには4つのフェーズ(Phase)があり、彼らが同時に作業している場合、すべてのタスクの同じステップが完了した場合にのみ、すべてのタスクが次のステップを開始することができます.
図に示すように、
ここでBarrierは.NetFrameworkが提供するメカニズムです.垣根(バリア)のように、すべてのタスクのフェーズを隔離し、現在のフェーズは完了せず、次のフェーズは開始しません.
コードの例:
using System;
using System.Threading;
using System.Threading.Tasks;

namespace Sample5_1_barrier
{
    class Program
    {
        private static void Phase0Doing(int TaskID)
        {
            Console.WriteLine("Task : #{0} ===== Phase 0", TaskID);
        }

        private static void Phase1Doing(int TaskID)
        {
            Console.WriteLine("Task : #{0} ***** Phase 1", TaskID);
        }

        private static void Phase2Doing(int TaskID)
        {
            Console.WriteLine("Task : #{0} ^^^^^ Phase 2", TaskID);
        }

        private static void Phase3Doing(int TaskID)
        {
            Console.WriteLine("Task : #{0} $$$$$ Phase 3", TaskID);
        }

        private static int _TaskNum = 4;
        private static Task[] _Tasks;
        private static Barrier _Barrier;


        static void Main(string[] args)
        {
            _Tasks = new Task[_TaskNum];
            _Barrier = new Barrier(_TaskNum, (barrier) =>
            {
                Console.WriteLine("-------------------------- Current Phase:{0} --------------------------", 
                                  _Barrier.CurrentPhaseNumber);
            });

            for (int i = 0; i < _TaskNum; i++)
            {
                _Tasks[i] = Task.Factory.StartNew((num) =>
                {
                    var taskid = (int)num;

                    Phase0Doing(taskid);
                    _Barrier.SignalAndWait();

                    Phase1Doing(taskid);
                    _Barrier.SignalAndWait();

                    Phase2Doing(taskid);
                    _Barrier.SignalAndWait();

                    Phase3Doing(taskid);
                    _Barrier.SignalAndWait();

                }, i);
            }

            var finalTask = Task.Factory.ContinueWhenAll(_Tasks, (tasks) =>
                {
                    Task.WaitAll(_Tasks);
                    Console.WriteLine("========================================");
                    Console.WriteLine("All Phase is completed");

                    _Barrier.Dispose();
                });

            finalTask.Wait();

            Console.ReadLine();
        }
    }
}

テスト結果:
バリア使用時の異常処理
バリアに入ると、動作するコードに異常が発生し、この異常はBarrierPostPhaseExceptionにパッケージされ、すべてのタスクがこの異常をキャプチャすることができます.元の例外は、NarrierPostPhaseExceptionオブジェクトのInnerExceptionからアクセスできます.
コード例:例外が投げ出された場所と、例外がキャプチャされた場所に注意してください.
using System;
using System.Threading;
using System.Threading.Tasks;

namespace Sample5_1_barrier
{
    class Program
    {
        private static void Phase0Doing(int TaskID)
        {
            Console.WriteLine("Task : #{0} ===== Phase 0", TaskID);
        }

        private static void Phase1Doing(int TaskID)
        {
            Console.WriteLine("Task : #{0} ***** Phase 1", TaskID);

        }

        private static void Phase2Doing(int TaskID)
        {
            Console.WriteLine("Task : #{0} ^^^^^ Phase 2", TaskID);
        }

        private static void Phase3Doing(int TaskID)
        {
            Console.WriteLine("Task : #{0} $$$$$ Phase 3", TaskID);
        }

        private static int _TaskNum = 4;
        private static Task[] _Tasks;
        private static Barrier _Barrier;


        static void Main(string[] args)
        {
            _Tasks = new Task[_TaskNum];
            _Barrier = new Barrier(_TaskNum, (barrier) =>
            {
                Console.WriteLine("-------------------------- Current Phase:{0} --------------------------", 
                                  _Barrier.CurrentPhaseNumber);
                if (_Barrier.CurrentPhaseNumber == 1)
                    throw new InvalidOperationException("Phase 2 need to be TERMINTED!!!!!");
            });

            for (int i = 0; i < _TaskNum; i++)
            {
                _Tasks[i] = Task.Factory.StartNew((num) =>
                {
                    var taskid = (int)num;

                    Phase0Doing(taskid);
                    _Barrier.SignalAndWait();

                    Phase1Doing(taskid);
                    try
                    {
                        _Barrier.SignalAndWait();
                    }
                    catch (BarrierPostPhaseException bpp_ex)
                    {
                        Console.WriteLine("Got an Exception in Phase1: " + bpp_ex.InnerException);
                    }

                    Phase2Doing(taskid);
                    _Barrier.SignalAndWait();

                    Phase3Doing(taskid);
                    _Barrier.SignalAndWait();

                }, i);
            }

            var finalTask = Task.Factory.ContinueWhenAll(_Tasks, (tasks) =>
                {
                    Task.WaitAll(_Tasks);
                    Console.WriteLine("========================================");
                    Console.WriteLine("All Phase is completed");

                    _Barrier.Dispose();
                });

            finalTask.Wait();

            Console.ReadLine();
        }
    }
}

Barrierタイムアウトの処理について
ここではBarrier.SignalAndWait(TIMEOUT))を使用してタイムアウトを判断します.サンプルコードではPhase 2のTask 3が10秒待ち、タイムアウト時間が2秒を超えると、BarrierでTaskのPhase 2がタイムアウトしてエラーが返されることが確認されます.
サンプルコード:
using System;
using System.Threading;
using System.Threading.Tasks;


namespace Sample5_1_barrier
{
    class Program
    {

        private static void Phase0Doing(int TaskID)
        {
            Console.WriteLine("Task : #{0} ===== Phase 0", TaskID);
        }

        private static void Phase1Doing(int TaskID)
        {
            Console.WriteLine("Task : #{0} ***** Phase 1", TaskID);
        }

        private static void Phase2Doing(int TaskID)
        {
            int i = 0;
            Console.WriteLine("Task : #{0} ^^^^^ Phase 2", TaskID);
            if (TaskID == 3)
                while (i < 10)
                {
                    System.Threading.Thread.Sleep(1000);
                    i++;
                }
        }

        private static void Phase3Doing(int TaskID)
        {
            Console.WriteLine("Task : #{0} $$$$$ Phase 3", TaskID);
        }

        private static int _TaskNum = 4;
        private static Task[] _Tasks;
        private static Barrier _Barrier;
        private static int TIMEOUT = 2000;


        static void Main(string[] args)
        {
            var cts = new System.Threading.CancellationTokenSource();
            var ct = cts.Token;

            _Tasks = new Task[_TaskNum];
            _Barrier = new Barrier(_TaskNum, (barrier) => { Console.WriteLine("-------------------------- Current Phase:{0} --------------------------", _Barrier.CurrentPhaseNumber); //if (_Barrier.CurrentPhaseNumber == 1) // throw new InvalidOperationException("Phase 2 need to be TERMINTED!!!!!"); }); for (int i = 0; i < _TaskNum; i++) { _Tasks[i] = Task.Factory.StartNew((num) => { var taskid = (int)num; Phase0Doing(taskid); if (!_Barrier.SignalAndWait(TIMEOUT)) { Console.WriteLine("`````````````` This Phase {0} TIMEOUT ``````````````", _Barrier.CurrentPhaseNumber); throw new OperationCanceledException("Phase 0 canceled: ", ct); } Phase1Doing(taskid); if (!_Barrier.SignalAndWait(TIMEOUT)) { Console.WriteLine("`````````````` This Phase {0} TIMEOUT ``````````````", _Barrier.CurrentPhaseNumber); throw new OperationCanceledException("Phase 1 canceled: ", ct); } Phase2Doing(taskid); if (!_Barrier.SignalAndWait(TIMEOUT)) { Console.WriteLine("`````````````` This Phase {0} TIMEOUT ``````````````", _Barrier.CurrentPhaseNumber); throw new OperationCanceledException("Phase 2 canceled: ", ct); } Phase3Doing(taskid); if (!_Barrier.SignalAndWait(TIMEOUT)) { Console.WriteLine("`````````````` This Phase {0} TIMEOUT ``````````````", _Barrier.CurrentPhaseNumber); throw new OperationCanceledException("Phase 3 canceled: ", ct); } }, i, ct); } var finalTask = Task.Factory.ContinueWhenAll(_Tasks, (tasks) => { Task.WaitAll(_Tasks); Console.WriteLine("========================================"); Console.WriteLine("All Phase is completed"); }, ct); try { finalTask.Wait(); } catch (AggregateException aex) { Console.WriteLine("Task failed And Canceled" + aex.ToString()); } finally { _Barrier.Dispose(); } Console.ReadLine(); } } }