カスタムスレッドプールの作成
24443 ワード
1.単一モードのカスタムスレッドプールクラスを作成する
2.スレッドプールと通信するための基本的なタイプを定義します.
上のコードは見えますか?
3.次に、カスタムスレッドプールクラスに共通インタフェースを追加する方法について説明します.
4. 次に、スレッドプールで使用する内部クラスとタイプを示します.
5.次に、スレッドプールの初期化作業とタスクキューの初期化を行います.
特に注意したいのは、taskSchedulerというスレッドクラスオブジェクトです.これは、スレッドプールのライフサイクルを常に貫通する追加のスレッドであり、プライマリスレッドとも言える.
ユーザー・タスク・キューを監視し、できるだけ早く実行するタスクです.また、最大と最小の強制も担当します.
スレッド数の制限、いくつかのクリーンアップを行います.
6.次に、スレッドの初期化を実現し、近接完了アルゴリズムを使用します.
7.ユーザタスクキュー実装
8.最後にテストコード
public class CustomThreadPool
{
//#region configurable items - for demo let's have these as constants
private const int MAX = 8; // maximum no of threads in pool
private const int MIN = 3; // minimum no of threads in pool
private const int MIN_WAIT = 10; // milliseconds
private const int MAX_WAIT = 15000; // milliseconds - threshold for simple task
private const int CLEANUP_INTERVAL = 60000; // millisecond - to free waiting threads in pool
private const int SCHEDULING_INTERVAL = 10; // millisecond - look for task in queue in loop
//#endregion
//#region singleton instance of threadpool
private static readonly CustomThreadPool _instance = new CustomThreadPool();
private CustomThreadPool() {
InitializeThreadPool();
}
public static CustomThreadPool Instance
{
get
{
return _instance;
}
}
//#endregion
private void InitializeThreadPool() {
//TODO: write initialization code here
}
}
2.スレッドプールと通信するための基本的なタイプを定義します.
public delegate void UserTask();
public class ClientHandle
{
public Guid ID;
public bool IsSimpleTask = false;
}
public class TaskStatus
{
public bool Success = true;
public Exception InnerException = null;
}
上のコードは見えますか?
UserTask
は、スレッドプール内のスレッドが実行するタスクを表すエージェントです.3.次に、カスタムスレッドプールクラスに共通インタフェースを追加する方法について説明します.
public ClientHandle QueueUserTask(UserTask task, Action<TaskStatus> callback)
{
throw new Exception("not implemented yet.");
}
public static void CancelUserTask(ClientHandle handle)
{
//TODO: write implementation code here
}
4. 次に、スレッドプールで使用する内部クラスとタイプを示します.
//#region nested private types
enum TaskState // to represent current state of a usertask
{
notstarted,
processing,
completed,
aborted
}
class TaskHandle // Item in waiting queue
{
public ClientHandle Token; // generate this everytime an usertask is queued and return to the caller as a reference.
public UserTask task; // the item to be queued - supplied by the caller
public Action<TaskStatus> callback; // optional - in case user want's a notification of completion
}
class TaskItem // running items in the pool - TaskHandle gets a thread to execute it
{
public TaskHandle taskHandle;
public Thread handler;
public TaskState taskState = TaskState.notstarted;
public DateTime startTime = DateTime.MaxValue;
}
//#endregion
5.次に、スレッドプールの初期化作業とタスクキューの初期化を行います.
//private instance members
private Queue<TaskHandle> ReadyQueue = null;
private List<TaskItem> Pool = null;
private Thread taskScheduler = null;
private void InitializeThreadPool()
{
ReadyQueue = new Queue<TaskHandle>();
Pool = new List<TaskItem>();
taskScheduler = new Thread(() =>
{
//TODO: write scheduling logic here
});
taskScheduler.Start();
}
特に注意したいのは、taskSchedulerというスレッドクラスオブジェクトです.これは、スレッドプールのライフサイクルを常に貫通する追加のスレッドであり、プライマリスレッドとも言える.
ユーザー・タスク・キューを監視し、できるだけ早く実行するタスクです.また、最大と最小の強制も担当します.
スレッド数の制限、いくつかのクリーンアップを行います.
6.次に、スレッドの初期化を実現し、近接完了アルゴリズムを使用します.
private void InitializeThreadPool()
{
ReadyQueue = new Queue<TaskHandle>();
Pool = new List<TaskItem>();
InitPoolWithMinCapacity(); // initialize Pool with Minimum capacity - that much thread must be kept ready
DateTime LastCleanup = DateTime.Now; // monitor this time for next cleaning activity
taskScheduler = new Thread(() =>
{
do
{
while (ReadyQueue.Count > 0 && ReadyQueue.Peek().task == null)
ReadyQueue.Dequeue();
// remove cancelled item/s - cancelled item will have it's task set to null
int itemCount = ReadyQueue.Count;
for (int i = 0; i < itemCount; i++)
{
TaskHandle readyItem = ReadyQueue.Peek(); // the Top item of queue
bool Added = false;
foreach (TaskItem ti in Pool)
{
if (ti.taskState == TaskState.completed)
{
// if in the Pool task state is completed then a different
// task can be handed over to that thread
ti.taskHandle = readyItem;
ti.taskState = TaskState.notstarted;
Added = true;
ReadyQueue.Dequeue();
break;
}
}
if (!Added && Pool.Count < MAX)
{
// if all threads in pool are busy and the count is still less than the
// Max limit set then create a new thread and add that to pool
TaskItem ti = new TaskItem() { taskState = TaskState.notstarted };
ti.taskHandle = readyItem;
// add a new TaskItem in the pool
AddTaskToPool(ti);
Added = true;
ReadyQueue.Dequeue();
}
if (!Added) break; // It's already crowded so try after sometime
}
if ((DateTime.Now - LastCleanup) > TimeSpan.FromMilliseconds(CLEANUP_INTERVAL))
// It's long time - so try to cleanup Pool once.
{
CleanupPool();
LastCleanup = DateTime.Now;
}
else
{
// either of these two can work - the combination is also fine for our demo.
Thread.Yield();
Thread.Sleep(SCHEDULING_INTERVAL); // Dont run madly in a loop - wait for sometime for things to change.
// the wait should be minimal - close to zero
}
} while (true);
});
taskScheduler.Priority = ThreadPriority.AboveNormal;
taskScheduler.Start();
}
private void InitPoolWithMinCapacity()
{
for (int i = 0; i <= MIN; i++)
{
TaskItem ti = new TaskItem() { taskState = TaskState.notstarted };
ti.taskHandle = new TaskHandle() { task = () => { } };
ti.taskHandle.callback = (taskStatus) => { };
ti.taskHandle.Token = new ClientHandle() { ID = Guid.NewGuid() };
AddTaskToPool(ti);
}
}
private void AddTaskToPool(TaskItem taskItem)
{
taskItem.handler = new Thread(() =>
{
do
{
bool Enter = false;
// if aborted then allow it to exit the loop so that it can complete and free-up thread resource.
// this state means it has been removed from Pool already.
if (taskItem.taskState == TaskState.aborted) break;
if (taskItem.taskState == TaskState.notstarted)
{
taskItem.taskState = TaskState.processing;
taskItem.startTime = DateTime.Now;
Enter = true;
}
if (Enter)
{
TaskStatus taskStatus = new TaskStatus();
try
{
taskItem.taskHandle.task.Invoke(); // execute the UserTask
taskStatus.Success = true;
}
catch (Exception ex)
{
taskStatus.Success = false;
taskStatus.InnerException = ex;
}
if (taskItem.taskHandle.callback != null && taskItem.taskState != TaskState.aborted)
{
try
{
taskItem.taskState = TaskState.completed;
taskItem.startTime = DateTime.MaxValue;
taskItem.taskHandle.callback(taskStatus); // notify callback with task-status
}
catch
{
}
}
}
// give other thread a chance to execute as it's current execution completed already
Thread.Yield(); Thread.Sleep(MIN_WAIT); //TODO: need to see if Sleep is required here
} while (true); // it's a continuous loop until task gets abort request
});
taskItem.handler.Start();
Pool.Add(taskItem);
}
private void CleanupPool()
{
throw new NotImplementedException();
}
7.ユーザタスクキュー実装
public ClientHandle QueueUserTask(UserTask task, Action<taskstatus> callback)
{
TaskHandle th = new TaskHandle()
{
task = task,
Token = new ClientHandle()
{
ID = Guid.NewGuid()
},
callback = callback
};
ReadyQueue.Enqueue(th);
return th.Token;
}
8.最後にテストコード
CustomThreadPool MyPool;
private void Form1_Load(object sender, EventArgs e)
{
MyPool = CustomThreadPool.Instance;
}
void showMessage(string message)
{
MessageBox.Show(message);
}
int x = 0;
private void btnStart_Click(object sender, EventArgs e)
{
x++;
int arg = x;
MyPool.QueueUserTask(() =>
{
showMessage(arg.ToString());
},
(ts) =>
{
showMessage(ts.Success.ToString());
});
}