C#とJavaのマルチスレッド制御クラス


クラスはスレッドのセットを実行するために使用され、スレッドのセットがすべて実行されるまでブロックされます.
 
 
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Runtime.CompilerServices;

namespace Pkysoft.Utility
{
    /// <summary>
    ///           ,                 。
    ///                           ,                MultiThreadManagerThread  。 
    /// usage:
    /// <code>
    ///        List<ThreadStart> list = new List<ThreadStart>();
    ///        for (int i = 0; i < 1000; i++)
    ///        {
    ///            list.Add(delegate(){new TestThread().Start();});
    ///        }
    ///        MultiThreadManager mtm = new MultiThreadManager(list, "mythread", 0);
    ///         try
    ///         {
    ///             mtm.RunAndWait();
    ///         }
    ///         catch (MultiThreadManagerException ex)
    ///         {
    ///             Console.WriteLine("Error:" + ex);
    ///         }
    /// </code>
    /// </summary>

    public class MultiThreadManager
    {
        List<ThreadStart> Threads;
        string NamePrefix;
        public MultiThreadManager(List<ThreadStart> threads, string namePrefix, int timeout)
        {
            this.Threads = threads;
            this.NamePrefix = namePrefix;
            this.EachThreadTimeout = timeout;
        }
        int EachThreadTimeout = 0;
        public void RunAndWait()
        {
            List<MultiThreadManagerError> exceptions = new List<MultiThreadManagerError>();
            int size = this.Threads.Count;
            CountdownLatch cdl = new CountdownLatch(size);
            for (int i = 0; i < size; i++)
            {
                ThreadStart t = this.Threads[i];
                string threadName = this.NamePrefix + "-" + i;

                ThreadCompletedHandler threadCompeltedHandler = new ThreadCompletedHandler(delegate(string name) { Console.WriteLine("Thread " + name + " completed"); cdl.Signal(); });
                ThreadErrordHandler threadErrorHandler = new ThreadErrordHandler(delegate(string name, Exception ex) { exceptions.Add(new MultiThreadManagerError(name, ex)); Console.WriteLine("Thread " + name + " failed("); cdl.Signal(); });
                ThreadRunner tr = new ThreadRunner(t, threadName, this.EachThreadTimeout, threadCompeltedHandler, threadErrorHandler);
                tr.run();
            }
            cdl.Wait();
            if (exceptions.Count > 0)
            {
                throw new MultiThreadManagerException(exceptions);
            }
        }
    }
    public class MultiThreadManagerError
    {
        public MultiThreadManagerError(string name, Exception ex)
        {
            this.Name = name;
            this.Exception = ex;
        }
        public string Name;
        public Exception Exception;
    }
    public class CountdownLatch
    {
        private int remain;
        private EventWaitHandle evt;

        public CountdownLatch(int count)
        {
            remain = count;
            evt = new ManualResetEvent(false);
        }

        public void Signal()
        {
            // The last thread to signal also sets the event.
            if (Interlocked.Decrement(ref remain) == 0)
                evt.Set();
        }

        public void Wait()
        {
            evt.WaitOne();
        }
    }

    public delegate void ThreadCompletedHandler(string name);
    public delegate void ThreadErrordHandler(string name, Exception ex);
    public class ThreadRunnerTimeException : Exception
    {
        public String Name;
        public ThreadRunnerTimeException(string name)
            : base("Thread " + name + " timeout.")
        {
            this.Name = name;
        }
    }
    public class ThreadRunner
    {

        ThreadStart ThreadStart;
        string ThreadName;
        int Timeout;
        ThreadCompletedHandler CompletedHandler;
        ThreadErrordHandler ErrorHandler;

        public   ThreadRunner(ThreadStart ts, string name, int timeout, ThreadCompletedHandler completed, ThreadErrordHandler error)
        {
            this.ThreadStart = ts;
            this.ThreadName = name;
            this.Timeout = timeout;
            this.CompletedHandler = completed;
            this.ErrorHandler = error;
        }
        Exception ThreadException = null;
        public void run()
        {

            new Thread(delegate()
            {
                try
                {
                    Thread t = new Thread(new ThreadStart(StartThreadStart));
                    t.Name = this.ThreadName;
                    t.Start();

                    Console.WriteLine("Thread " + t.Name + " started");
                    bool jret = true;
                    if (this.Timeout > 0)
                        jret = t.Join(Timeout);
                    else
                        t.Join();


                    if (!jret && ThreadException == null) ThreadException = new ThreadRunnerTimeException(this.ThreadName);
                    if (ThreadException != null) throw ThreadException;
                    this.CompletedHandler(this.ThreadName);
                }
                catch (Exception ex)
                {
                    this.ErrorHandler(this.ThreadName, ex);
                }


            }).Start();
        }
        protected void StartThreadStart()
        {
            try
            {

                this.ThreadStart.Invoke();

            }
            catch (Exception ex)
            {
                this.ThreadException = ex;
            }
        }

    }
    public class MultiThreadManagerException : Exception
    {
        public List<MultiThreadManagerError> Errors;
        public MultiThreadManagerException(List<MultiThreadManagerError> errors)
            : base()
        {
            this.Errors = errors;
        }

    }
}
 
 
 
 
package pkysoft.common.thread;


import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
 *           ,                 。
 * This class can be used to wait on a group of Runnables until they are done executing.
 * The runnables do not have to implement a special interface, however they should wrap any exception they
 * want to throw in a RuntimeException. The real exception that is thrown there will be
 * thrown as a {@link MultiThreadManagerException} from the {@link #runAndWait()} method.
 * <p/>
 * Example:
 * <pre>
 * public class MyRunnable implements Runnable
 * {
 *    public void run()
 *    {
 *       try
 *       {
 *          // do your logic here
 *       }
 *       catch( MyLogicalException e )
 *       {
 *          throw new RuntimeException( e ); //wrap your exception in a RuntimeException
 *       }
 *    }
 * }
 * ...
 * // in your main class
 * try
 * {
 *    MultiThreadManager manager = new MultiThreadManager( runnables, "MyRunnables-" );
 *    manager.runAndWait();
 * }
 * catch( MultiThreadManagerException e )
 * {
 *    MyLogicalException ex = (MyLogicalException)e.getCause();
 * }
 * </pre>
 */
public class MultiThreadManager
{
    private static Log logger = LogFactory.getLog(MultiThreadManager.class);
    private final List<Runnable> _runnables;
    private final String _threadNamePrefix;
    private final int _timeout;
    private final int _interruptTimeout;

    /**
     * This constructor will cause the {@link #runAndWait()} to wait for ever until all
     * runnables are done
     *
     * @param runnables
     * @param threadNamePrefix
     */
    public MultiThreadManager(List<Runnable> runnables, String threadNamePrefix)
    {
        this(runnables, threadNamePrefix, -1, -1);
    }

    /**
     * This constructor will cause the {@link #runAndWait()} to wait for <code>timeout</code> seconds until all
     * runnables are done
     *
     * @param runnables
     * @param threadNamePrefix
     * @param timeout          the timeout in seconds
     */
    public MultiThreadManager(List<Runnable> runnables, String threadNamePrefix, int timeout)
    {
        this(runnables, threadNamePrefix, timeout, -1);
    }

    /**
     * This constructor will cause the {@link #runAndWait()} to wait for <code>timeout</code> seconds until all
     * runnables are done. When the current thread is interrupted, we will not wait more then <code>interruptTimeout</code>
     * for the runnables to stop.
     *
     * @param runnables
     * @param threadNamePrefix
     * @param timeout          the timeout in seconds
     * @param interruptTimeout
     */
    public MultiThreadManager(List<Runnable> runnables, String threadNamePrefix, int timeout, int interruptTimeout)
    {
        _runnables = runnables;
        _threadNamePrefix = threadNamePrefix;
        _timeout = timeout;
        _interruptTimeout = interruptTimeout;
    }

    public void runAndWait() throws MultiThreadManagerException
    {
        CountDownLatch countDownLatch = new CountDownLatch(_runnables.size());
        List<Thread> threads = new ArrayList<Thread>(_runnables.size());
        ThreadExceptionHandler exceptionHandler = new ThreadExceptionHandler(threads);
        int counter = 1;
        for (Runnable runnable : _runnables)
        {
            Thread thread = new Thread(new CountDownLatchRunnable(countDownLatch, runnable), _threadNamePrefix + counter);
            thread.setUncaughtExceptionHandler(exceptionHandler);
            thread.start();
            counter++;
            threads.add(thread);
        }

        waitForAllThreadsToFinish(countDownLatch, threads);

        if (exceptionHandler.getException() != null)
        {
            //logger.debug("an exception occurred in one of the worker threads: " + exceptionHandler.getException());
            throw new MultiThreadManagerException(exceptionHandler.getException());
        }
        else
        {
            //logger.debug("no exception occurred in the worker threads");
        }
    }

    /**
     * @param countDownLatch
     * @param threads
     * @throws MultiThreadManagerException when the worker threads are still not finished after running for <code>timeout</code> seconds
     */
    private void waitForAllThreadsToFinish(CountDownLatch countDownLatch, List<Thread> threads) throws MultiThreadManagerException
    {
        try
        {
            if (_timeout == -1)
            {
                countDownLatch.await();
            }
            else
            {
                boolean success = countDownLatch.await(_timeout, TimeUnit.SECONDS);
                if (!success)
                {
                    interruptThreadsAndWait(threads, countDownLatch);
                    throw new MultiThreadManagerException("The worker threads are still not finished after waiting " + _timeout + " seconds!");
                }
            }

            // If an exception occurs, the handler is notified after the finally that lowers the counter
            // In that case, we need to wait for the treads to finish so the handler can receive the exception
            // that has been thrown.
            makeSureAllWorkerThreadsAreDead(threads);
        }
        catch (InterruptedException e)
        {
            //logger.error(e.getMessage(), e);
            interruptThreadsAndWait(threads, countDownLatch);
            throw new MultiThreadManagerException(e);
        }
    }

    private void makeSureAllWorkerThreadsAreDead(List<Thread> threads)
    {
        //logger.debug("makeSureAllWorkerThreadsAreDead() called...");
        while (stillThreadsAlive(threads))
        {
            try
            {
                for (Thread thread : threads)
                {
                    thread.join();
                }
            }
            catch (InterruptedException e)
            {
                logger.error(e.getMessage(), e);
            }
        }
    }

    private boolean stillThreadsAlive(List<Thread> threads)
    {
        boolean result = false;
        for (Thread thread : threads)
        {
            if (thread.isAlive())
            {
                logger.debug("thread '" + thread.getName() + "' is still alive!");
                result = true;
                break;
            }
        }

        return result;
    }

    /**
     * Interrupt the threads and wait for them to finish
     *
     * @param threads
     * @param countDownLatch
     * @throws MultiThreadManagerException
     */
    private void interruptThreadsAndWait(List<Thread> threads, CountDownLatch countDownLatch)
            throws MultiThreadManagerException
    {
        interruptThreads(threads);
        try
        {
            // Wait for interrupted threads to stopAndReturn, so the unexecute will not interfere with the execute of the workers
            if (_interruptTimeout == -1)
            {
                countDownLatch.await();
            }
            else
            {
                boolean success = countDownLatch.await(_interruptTimeout, TimeUnit.SECONDS);
                if (!success)
                {
                    throw new MultiThreadManagerException("The worker threads did not stopAndReturn in " + _interruptTimeout + " seconds!");
                }
            }
        }
        catch (InterruptedException e)
        {
            logger.error(e.getMessage(), e);
            throw new MultiThreadManagerException(e);
        }
    }

    /**
     * Interrups all the threads in the list. Will not interrupt the current thread if it is part of the list
     *
     * @param threads
     */
    private void interruptThreads(List<Thread> threads)
    {
        for (Thread thread : threads)
        {
            if (thread != Thread.currentThread())
            {
                logger.debug("interrupting thread: " + thread);
                thread.interrupt();
            }
        }
    }

    /**
     * Handler for the exceptions that occur in the runnables
     */
    private class ThreadExceptionHandler implements Thread.UncaughtExceptionHandler
    {
        private List<Thread> _threads;
        private Throwable _throwable = null;

        public ThreadExceptionHandler(List<Thread> threads)
        {
            _threads = threads;
        }

        public synchronized void uncaughtException(Thread t, Throwable e)
        {
            //logger.error(e.getMessage(), e);
            if (_throwable == null)
            {
                logger.debug("uncaughtException: " + e);
                _throwable = e.getCause() != null ? e.getCause() : e;
                interruptThreads(_threads);
            }
            else
            {
                logger.debug("Current thread is already interrupted");
            }
        }

        public Throwable getException()
		{
			return _throwable;
		}
	}
}