C#とJavaのマルチスレッド制御クラス
クラスはスレッドのセットを実行するために使用され、スレッドのセットがすべて実行されるまでブロックされます.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Runtime.CompilerServices;
namespace Pkysoft.Utility
{
/// <summary>
/// , 。
/// , MultiThreadManagerThread 。
/// usage:
/// <code>
/// List<ThreadStart> list = new List<ThreadStart>();
/// for (int i = 0; i < 1000; i++)
/// {
/// list.Add(delegate(){new TestThread().Start();});
/// }
/// MultiThreadManager mtm = new MultiThreadManager(list, "mythread", 0);
/// try
/// {
/// mtm.RunAndWait();
/// }
/// catch (MultiThreadManagerException ex)
/// {
/// Console.WriteLine("Error:" + ex);
/// }
/// </code>
/// </summary>
public class MultiThreadManager
{
List<ThreadStart> Threads;
string NamePrefix;
public MultiThreadManager(List<ThreadStart> threads, string namePrefix, int timeout)
{
this.Threads = threads;
this.NamePrefix = namePrefix;
this.EachThreadTimeout = timeout;
}
int EachThreadTimeout = 0;
public void RunAndWait()
{
List<MultiThreadManagerError> exceptions = new List<MultiThreadManagerError>();
int size = this.Threads.Count;
CountdownLatch cdl = new CountdownLatch(size);
for (int i = 0; i < size; i++)
{
ThreadStart t = this.Threads[i];
string threadName = this.NamePrefix + "-" + i;
ThreadCompletedHandler threadCompeltedHandler = new ThreadCompletedHandler(delegate(string name) { Console.WriteLine("Thread " + name + " completed"); cdl.Signal(); });
ThreadErrordHandler threadErrorHandler = new ThreadErrordHandler(delegate(string name, Exception ex) { exceptions.Add(new MultiThreadManagerError(name, ex)); Console.WriteLine("Thread " + name + " failed("); cdl.Signal(); });
ThreadRunner tr = new ThreadRunner(t, threadName, this.EachThreadTimeout, threadCompeltedHandler, threadErrorHandler);
tr.run();
}
cdl.Wait();
if (exceptions.Count > 0)
{
throw new MultiThreadManagerException(exceptions);
}
}
}
public class MultiThreadManagerError
{
public MultiThreadManagerError(string name, Exception ex)
{
this.Name = name;
this.Exception = ex;
}
public string Name;
public Exception Exception;
}
public class CountdownLatch
{
private int remain;
private EventWaitHandle evt;
public CountdownLatch(int count)
{
remain = count;
evt = new ManualResetEvent(false);
}
public void Signal()
{
// The last thread to signal also sets the event.
if (Interlocked.Decrement(ref remain) == 0)
evt.Set();
}
public void Wait()
{
evt.WaitOne();
}
}
public delegate void ThreadCompletedHandler(string name);
public delegate void ThreadErrordHandler(string name, Exception ex);
public class ThreadRunnerTimeException : Exception
{
public String Name;
public ThreadRunnerTimeException(string name)
: base("Thread " + name + " timeout.")
{
this.Name = name;
}
}
public class ThreadRunner
{
ThreadStart ThreadStart;
string ThreadName;
int Timeout;
ThreadCompletedHandler CompletedHandler;
ThreadErrordHandler ErrorHandler;
public ThreadRunner(ThreadStart ts, string name, int timeout, ThreadCompletedHandler completed, ThreadErrordHandler error)
{
this.ThreadStart = ts;
this.ThreadName = name;
this.Timeout = timeout;
this.CompletedHandler = completed;
this.ErrorHandler = error;
}
Exception ThreadException = null;
public void run()
{
new Thread(delegate()
{
try
{
Thread t = new Thread(new ThreadStart(StartThreadStart));
t.Name = this.ThreadName;
t.Start();
Console.WriteLine("Thread " + t.Name + " started");
bool jret = true;
if (this.Timeout > 0)
jret = t.Join(Timeout);
else
t.Join();
if (!jret && ThreadException == null) ThreadException = new ThreadRunnerTimeException(this.ThreadName);
if (ThreadException != null) throw ThreadException;
this.CompletedHandler(this.ThreadName);
}
catch (Exception ex)
{
this.ErrorHandler(this.ThreadName, ex);
}
}).Start();
}
protected void StartThreadStart()
{
try
{
this.ThreadStart.Invoke();
}
catch (Exception ex)
{
this.ThreadException = ex;
}
}
}
public class MultiThreadManagerException : Exception
{
public List<MultiThreadManagerError> Errors;
public MultiThreadManagerException(List<MultiThreadManagerError> errors)
: base()
{
this.Errors = errors;
}
}
}
package pkysoft.common.thread;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* , 。
* This class can be used to wait on a group of Runnables until they are done executing.
* The runnables do not have to implement a special interface, however they should wrap any exception they
* want to throw in a RuntimeException. The real exception that is thrown there will be
* thrown as a {@link MultiThreadManagerException} from the {@link #runAndWait()} method.
* <p/>
* Example:
* <pre>
* public class MyRunnable implements Runnable
* {
* public void run()
* {
* try
* {
* // do your logic here
* }
* catch( MyLogicalException e )
* {
* throw new RuntimeException( e ); //wrap your exception in a RuntimeException
* }
* }
* }
* ...
* // in your main class
* try
* {
* MultiThreadManager manager = new MultiThreadManager( runnables, "MyRunnables-" );
* manager.runAndWait();
* }
* catch( MultiThreadManagerException e )
* {
* MyLogicalException ex = (MyLogicalException)e.getCause();
* }
* </pre>
*/
public class MultiThreadManager
{
private static Log logger = LogFactory.getLog(MultiThreadManager.class);
private final List<Runnable> _runnables;
private final String _threadNamePrefix;
private final int _timeout;
private final int _interruptTimeout;
/**
* This constructor will cause the {@link #runAndWait()} to wait for ever until all
* runnables are done
*
* @param runnables
* @param threadNamePrefix
*/
public MultiThreadManager(List<Runnable> runnables, String threadNamePrefix)
{
this(runnables, threadNamePrefix, -1, -1);
}
/**
* This constructor will cause the {@link #runAndWait()} to wait for <code>timeout</code> seconds until all
* runnables are done
*
* @param runnables
* @param threadNamePrefix
* @param timeout the timeout in seconds
*/
public MultiThreadManager(List<Runnable> runnables, String threadNamePrefix, int timeout)
{
this(runnables, threadNamePrefix, timeout, -1);
}
/**
* This constructor will cause the {@link #runAndWait()} to wait for <code>timeout</code> seconds until all
* runnables are done. When the current thread is interrupted, we will not wait more then <code>interruptTimeout</code>
* for the runnables to stop.
*
* @param runnables
* @param threadNamePrefix
* @param timeout the timeout in seconds
* @param interruptTimeout
*/
public MultiThreadManager(List<Runnable> runnables, String threadNamePrefix, int timeout, int interruptTimeout)
{
_runnables = runnables;
_threadNamePrefix = threadNamePrefix;
_timeout = timeout;
_interruptTimeout = interruptTimeout;
}
public void runAndWait() throws MultiThreadManagerException
{
CountDownLatch countDownLatch = new CountDownLatch(_runnables.size());
List<Thread> threads = new ArrayList<Thread>(_runnables.size());
ThreadExceptionHandler exceptionHandler = new ThreadExceptionHandler(threads);
int counter = 1;
for (Runnable runnable : _runnables)
{
Thread thread = new Thread(new CountDownLatchRunnable(countDownLatch, runnable), _threadNamePrefix + counter);
thread.setUncaughtExceptionHandler(exceptionHandler);
thread.start();
counter++;
threads.add(thread);
}
waitForAllThreadsToFinish(countDownLatch, threads);
if (exceptionHandler.getException() != null)
{
//logger.debug("an exception occurred in one of the worker threads: " + exceptionHandler.getException());
throw new MultiThreadManagerException(exceptionHandler.getException());
}
else
{
//logger.debug("no exception occurred in the worker threads");
}
}
/**
* @param countDownLatch
* @param threads
* @throws MultiThreadManagerException when the worker threads are still not finished after running for <code>timeout</code> seconds
*/
private void waitForAllThreadsToFinish(CountDownLatch countDownLatch, List<Thread> threads) throws MultiThreadManagerException
{
try
{
if (_timeout == -1)
{
countDownLatch.await();
}
else
{
boolean success = countDownLatch.await(_timeout, TimeUnit.SECONDS);
if (!success)
{
interruptThreadsAndWait(threads, countDownLatch);
throw new MultiThreadManagerException("The worker threads are still not finished after waiting " + _timeout + " seconds!");
}
}
// If an exception occurs, the handler is notified after the finally that lowers the counter
// In that case, we need to wait for the treads to finish so the handler can receive the exception
// that has been thrown.
makeSureAllWorkerThreadsAreDead(threads);
}
catch (InterruptedException e)
{
//logger.error(e.getMessage(), e);
interruptThreadsAndWait(threads, countDownLatch);
throw new MultiThreadManagerException(e);
}
}
private void makeSureAllWorkerThreadsAreDead(List<Thread> threads)
{
//logger.debug("makeSureAllWorkerThreadsAreDead() called...");
while (stillThreadsAlive(threads))
{
try
{
for (Thread thread : threads)
{
thread.join();
}
}
catch (InterruptedException e)
{
logger.error(e.getMessage(), e);
}
}
}
private boolean stillThreadsAlive(List<Thread> threads)
{
boolean result = false;
for (Thread thread : threads)
{
if (thread.isAlive())
{
logger.debug("thread '" + thread.getName() + "' is still alive!");
result = true;
break;
}
}
return result;
}
/**
* Interrupt the threads and wait for them to finish
*
* @param threads
* @param countDownLatch
* @throws MultiThreadManagerException
*/
private void interruptThreadsAndWait(List<Thread> threads, CountDownLatch countDownLatch)
throws MultiThreadManagerException
{
interruptThreads(threads);
try
{
// Wait for interrupted threads to stopAndReturn, so the unexecute will not interfere with the execute of the workers
if (_interruptTimeout == -1)
{
countDownLatch.await();
}
else
{
boolean success = countDownLatch.await(_interruptTimeout, TimeUnit.SECONDS);
if (!success)
{
throw new MultiThreadManagerException("The worker threads did not stopAndReturn in " + _interruptTimeout + " seconds!");
}
}
}
catch (InterruptedException e)
{
logger.error(e.getMessage(), e);
throw new MultiThreadManagerException(e);
}
}
/**
* Interrups all the threads in the list. Will not interrupt the current thread if it is part of the list
*
* @param threads
*/
private void interruptThreads(List<Thread> threads)
{
for (Thread thread : threads)
{
if (thread != Thread.currentThread())
{
logger.debug("interrupting thread: " + thread);
thread.interrupt();
}
}
}
/**
* Handler for the exceptions that occur in the runnables
*/
private class ThreadExceptionHandler implements Thread.UncaughtExceptionHandler
{
private List<Thread> _threads;
private Throwable _throwable = null;
public ThreadExceptionHandler(List<Thread> threads)
{
_threads = threads;
}
public synchronized void uncaughtException(Thread t, Throwable e)
{
//logger.error(e.getMessage(), e);
if (_throwable == null)
{
logger.debug("uncaughtException: " + e);
_throwable = e.getCause() != null ? e.getCause() : e;
interruptThreads(_threads);
}
else
{
logger.debug("Current thread is already interrupted");
}
}
public Throwable getException()
{
return _throwable;
}
}
}