興味深いUnityマルチスレッドの練習例

27526 ワード

ここでは、C#スレッドに関する情報を記録し、更新を継続し、Unityでの使用を容易にするツールクラスを形成します.
いくつかのTips:
1)Monitor VS. Mutex  
Difference between mutex and monitor. 
The biggest difference between the two is the scope: a mutex's scope is system-wide, whilst monitor's scope depends on the scope of the object you lock on. This means that with monitor, the widest scope you can get is generally application-wide.
So you would mostly use monitor to synchronise between threads running in an application, and use mutex to synchronise between different applications.
2)Monitor VS. lock
lock is just shortcut for Monitor.Enter with try + finally and Monitor.Exit. Use lock statement whenever it is enough - if you need something like TryEnter, you will have to use Monitor.
3)Semaphore VS. Monitors
Semaphore.WaitOne/Release vs Monitor.Pulse/Wait
Monitor.Wait and Monitor.Pulse are fundamental synchronization mechanisms that can be used to form pretty much any other synchronization device including semaphores.
結論として,Monitorを用いることでマルチスレッドの目的を達成することができ,lockはこの過程を簡略化することができる.
4)Terminating a thread cleanly
5) Should a return statement be inside or outside a lock?
 At the IL level they are identical.
Only the current owner of the lock can signal a waiting object using Pulse.
The Pulse, PulseAll, and Wait methods must be invoked from within a synchronized block of code.
ダウンロードスレッドの例:
 private class DownloadFactory {
        #region =====        =====
        private volatile bool isOpen;
        private Queue workingJobs = new Queue();
        private Queue finishedJobs = new Queue();
        #endregion

        const int WORK_NUM = 5; //   5     
        private Thread[] workers; //     ,    
        private Thread checker;  //     ,            

        public DownloadFactory() {
            isOpen = false;

            workers = new Thread[WORK_NUM];
            for (int i = 0; i < WORK_NUM; i++) { 
                workers[i] = new Thread(() => {
                    while (isOpen) {
                        var job = takeJob();
                        // do download affairs!
                        finJob(job);
                    }
                });
            }

            checker = new Thread(() => {
                while (isOpen) {
                    var job = checkJob();
                    if (!job.isOK) {  //      ,     
                        putJob(job);
                    } else { //      

                    }
                }
            });
        }

        public bool isDone {
            get {
                return totalCount == 0;
            }
        }

        //   
        public void Open(Queue _allJobs) {
            workingJobs = _allJobs;
            isOpen = true;
            checker.Start();
            foreach (var w in workers) {
                w.Start();
            }
        }

        //   
        public void Close() {
            isOpen = false;
            checker.Interrupt();
            foreach (var w in workers) {
                w.Interrupt();
            }
            checker.Join();
            foreach (var w in workers) {
                w.Join();
            }
        }

        #region =========        ===========
        private int totalCount {
            get {
                lock (this) {
                    return workingJobs.Count + finishedJobs.Count;
                }
            }
        }

        private DownloadJob takeJob() {
            lock (this) {
                while (workingJobs.Count <= 0) {
                    Monitor.Wait(this);
                }
                var rt = workingJobs.Dequeue();
                Monitor.PulseAll(this);
                return rt;
            }
        }

        private void putJob(DownloadJob job) {
            lock (this) {
                workingJobs.Enqueue(job);
                Monitor.PulseAll(this);
            }
        }

        private void finJob(DownloadJob job) {
            lock (this) {
                finishedJobs.Enqueue(job);
                Monitor.PulseAll(this);
            }
        }

        private DownloadJob checkJob() {
            lock(this) {
                while (finishedJobs.Count <= 0) {
                    Monitor.Wait(this);
                }
                var job = finishedJobs.Dequeue();
                Monitor.PulseAll(this);
                return job;
            }
        }
        #endregion
    }

例:生産消費者モデル
using UnityEngine;
using System.Collections;
using System.Collections.Generic;
using System.Threading;
using UnityEngine.UI;

public class MultiThread : MonoBehaviour {
    private List threads = new List();
    public Text txtMessage;
    public Text txtMsg2;

    void Start() {
        var kitchen = new Kitchen();
        var table = new Table(10);
        var msgHandler = new TMsgHandler(this, msg => {
            txtMsg2.text = msg as string;
        });

        ThreadHelper.Run eater = env => {
            try {
                var random = new System.Random((int)env.param);
                while (env.alive) {
                    var cake = table.take();
                    Thread.Sleep(random.Next(100));
                    msgHandler.send(delegate {
                        txtMessage.text = env.name + "" + cake;
                    });
                }
            } catch (ThreadInterruptedException ex) {
                msgHandler.send(delegate {
                    txtMessage.text = string.Format("The {0} Interrupted!", env.name);
                });
            } finally {
                msgHandler.send(delegate {
                    txtMessage.text = string.Format("The {0} Ended!", env.name);
                });
            }
        };

        ThreadHelper.Run maker = env => {
            try {
                var param = env.param as object[];
                var random = new System.Random((int)param[1]);
                while (env.alive) {
                    Thread.Sleep(random.Next(100));
                    var cake = string.Format("[ Cake No.{0} by {1} ]", kitchen.nextCakeId, env.name);
                    table.put(cake);
                    msgHandler.send(env.name + "" + cake);
                }

            } catch (ThreadInterruptedException ex) {
                msgHandler.send(delegate {
                    txtMessage.text = string.Format("The {0} Interrupted!", env.name);
                });
            } finally {
                msgHandler.send(delegate {
                    txtMessage.text = string.Format("The {0} Ended!", env.name);
                });
            }
        };
        //        :13821, 97535, 79593, 87481, 66158, 18199, 40915, 63093, 77880, 93602, 17603, 38327  
        threads.Add(new ThreadHelper("MakerThread-1", maker, new object[] { "111", 13821 }));
        threads.Add(new ThreadHelper("MakerThread-2", maker, new object[] { "222", 97535 }));
        threads.Add(new ThreadHelper("MakerThread-3", maker, new object[] { "111", 79593 }));
        threads.Add(new ThreadHelper("MakerThread-4", maker, new object[] { "222", 87481 }));
        threads.Add(new ThreadHelper("MakerThread-5", maker, new object[] { "111", 66158 }));
        threads.Add(new ThreadHelper("MakerThread-6", maker, new object[] { "222", 18199 }));
        threads.Add(new ThreadHelper("EaterThread-A", eater, 40915));
        threads.Add(new ThreadHelper("EaterThread-B", eater, 63093));
        threads.Add(new ThreadHelper("EaterThread-C", eater, 77880));
        threads.Add(new ThreadHelper("EaterThread-D", eater, 93602));
        threads.Add(new ThreadHelper("EaterThread-E", eater, 17603));
        threads.Add(new ThreadHelper("EaterThread-F", eater, 38327));

        //         
        threads.ForEach(t => t.Start());
    }

    void OnDestroy() {
        threads.ForEach(m => m.Kill());
    }

    //         ,  android.os.Handler     
    public class TMsgHandler {
        public delegate void PostHandler();

        private System.Action msgHandler;
        private List messages = new List();

        public TMsgHandler(MonoBehaviour mono, System.Action handler = null) {
            msgHandler = handler;
            mono.StartCoroutine(checkMessage());
        }

        public void send(PostHandler action) {
            lock (this) {
                messages.Add(action);
            }
        }

        public void send(object param) {
            lock (this) {
                messages.Add(param);
            }
        }

        private List getMessages() {
            lock (this) {
                if (messages.Count > 0) {
                    var old = messages;
                    messages = new List();
                    return old;
                }
                return null;
            }
        }

        private IEnumerator checkMessage() {
            while (true) {
                yield return null;
                var msgs = getMessages();
                if (msgs != null) {
                    Debug.LogError(msgs.Count);
                    foreach (var m in msgs) {
                        if (m is PostHandler) {
                            var h = m as PostHandler;
                            h();
                        } else {
                            if (msgHandler != null) {
                                msgHandler(m);
                            }
                        }
                    }
                }
            }
        }
    }

    public class ThreadHelper {
        public delegate void Run(ThreadHelper env);
        public Run run = null;
        public volatile bool alive;

        public string name { get { return thread.Name; } }
        public Thread thread { get; private set; }
        public object param = null;
        public ThreadHelper(string name, Run _run, object _param = null) {
            alive = false;
            run = _run;
            param = _param;
            thread = new Thread(_inner_run);
            thread.Name = name;
            //background threads do not prevent a process from terminating.     
            //Once all foreground threads belonging to a process have terminated,    
            //the common language runtime ends the process.    
            thread.IsBackground = true;
        }

        public void _inner_run() {
            if (run != null) {
                run(this);
            }
        }

        public void Start() {
            alive = true;
            thread.Start();
        }

        public void notifyStop() {
            alive = false;
        }

        public void Kill(int timeout = 0) {
            thread.Interrupt();
            thread.Join(timeout);
        }
    }

    public class Kitchen {
        private int CakeId;

        public int nextCakeId {
            get {
                lock (this) {
                    return CakeId++;
                }
            }
        }
    }

    public class Table {
        private string[] buffer;
        private int tail;
        private int head;
        private int count;

        public Table(int _count) {
            this.buffer = new string[_count];
            tail = head = count = 0;
        }

        public void put(string cake) {
            lock (this) {
                //Debug.LogWarningFormat("{0} >> puts >> {1}", Thread.CurrentThread.Name, cake);
                while (count >= buffer.Length) {
                    //Debug.LogFormat("{0} wait BEGIN", Thread.CurrentThread.Name);
                    Monitor.Wait(this);
                    //Debug.LogFormat("{0} wait END", Thread.CurrentThread.Name);
                }
                buffer[tail] = cake;
                tail = (tail + 1) % buffer.Length;
                count++;
                Monitor.Pulse(this);
            }
        }

        public string take() {
            lock (this) {
                while (count <= 0) {
                    //Debug.LogFormat("{0} wait BEGIN", Thread.CurrentThread.Name);
                    Monitor.Wait(this);
                    //Debug.LogFormat("{0} wait END", Thread.CurrentThread.Name);
                }

                var cake = buffer[head];
                head = (head + 1) % buffer.Length;
                count--;
                Monitor.Pulse(this);
                //Debug.LogErrorFormat("{0} << takes << {1}", Thread.CurrentThread.Name, cake);
                return cake;
            }
        }
    }
}

 
より興味深いバージョン:
using UnityEngine;
using System.Collections;
using System.Threading;
using System.Collections.Generic;

//        
public class ThreadObj {
    protected System.Action run = null;
    public volatile bool alive;

    public string thdName { get { return thread.Name; } }
    public Thread thread { get; private set; }
    public object param = null;
    public ThreadObj(string name, System.Action _run = null, object _param = null) {
        alive = false;
        run = _run;
        param = _param;
        thread = new Thread(_inner_run);
        thread.Name = name;
        //background threads do not prevent a process from terminating.     
        //Once all foreground threads belonging to a process have terminated,    
        //the common language runtime ends the process.    
        thread.IsBackground = true;
    }

    public void _inner_run() {
        if (run != null) {
            run(this);
        }
    }

    public void Start() {
        alive = true;
        thread.Start();
    }

    public void notifyStop() {
        alive = false;
    }

    public void Kill(int timeout = 0) {
        thread.Interrupt();
        thread.Join(timeout);
    }
}

//         ,  android.os.Handler     
public class TMsgHandler {
    public delegate void PostHandler();

    private System.Action msgHandler;
    private List messages = new List();

    public TMsgHandler(MonoBehaviour mono, System.Action handler = null) {
        msgHandler = handler;
        mono.StartCoroutine(checkMessage());
    }

    public void send(object param) {
        lock (this) {
            messages.Add(param);
        }
    }

    private List getMessages() {
        lock (this) {
            if (messages.Count > 0) {
                var old = messages;
                messages = new List();
                return old;
            }
            return null;
        }
    }

    private IEnumerator checkMessage() {
        while (true) {
            yield return null;
            var msgs = getMessages();
            if (msgs != null) {
                foreach (var m in msgs) {
                    if (m is PostHandler) {
                        var h = m as PostHandler;
                        h();
                    } else {
                        if (msgHandler != null) {
                            msgHandler(m);
                        }
                    }
                }
            }
        }
    }
}
using UnityEngine;
using UnityEngine.UI;

//     
public class UIDiningHall : MonoBehaviour {
    public Text cakeNo;
    public Table table;
    public Transform makers;
    public Transform eaters;

    private DiningHall diningHall = new DiningHall();

    // Use this for initialization
    void Start () {
        var msgHandler = new TMsgHandler(this, msg => {
            if (msg is Maker.Event) {  //     
                Debug.Log(msg.ToString());
            } else if (msg is Eater.Event) { //     
                Debug.Log(msg.ToString());
            }
        });
        diningHall.Init(msgHandler);
        diningHall.Open();  //     
	}

    void OnDestroy() {
        Debug.LogError("=========Dining Hall is Closing===========");
        diningHall.Close(); //     
    }
}
using UnityEngine;
using System.Collections;
using System.Collections.Generic;
using System.Threading;

//   ,     
public class DiningHall {

    private List persons = new List(); //      ,       

    public Kitchen kitchen { get; private set; }
    public Table table { get; private set; }
    private TMsgHandler handler;

    public void Init(TMsgHandler msgHandler) {
        handler = msgHandler;
        kitchen = new Kitchen(30);
        table = new Table(5);

        //        :13821, 97535, 79593, 87481, 66158, 18199, 
        // 40915, 63093, 77880, 93602, 17603, 38327

        //       
        persons.Add(new Maker("CookA", 1, 13821, this));
        persons.Add(new Maker("CookB", 1, 97535, this));
        persons.Add(new Maker("CookC", 1, 79593, this));
        persons.Add(new Maker("CookD", 1, 87481, this));
        persons.Add(new Maker("CookE", 1, 87481, this));
        persons.Add(new Maker("CookF", 1, 87481, this));
        persons.Add(new Maker("CookG", 1, 87481, this));
        persons.Add(new Maker("CookA", 1, 13821, this));

        //       
        persons.Add(new Eater("Eater1", 10, 40915, this));
        persons.Add(new Eater("Eater2", 20, 63093, this));
        persons.Add(new Eater("Eater3", 30, 77880, this));
        persons.Add(new Eater("Eater4", 40, 93602, this));
        persons.Add(new Eater("Eater1", 10, 40915, this));
        persons.Add(new Eater("Eater2", 20, 63093, this));
        persons.Add(new Eater("Eater3", 30, 77880, this));
        persons.Add(new Eater("Eater4", 40, 93602, this));;
    }

    //      UI
    public void send2UI(object param) {
        handler.send(param);
    }

    public Cake takeCake() {
        return table.take();
    }

    //     
    public void Open() {
        //     ,            
        persons.ForEach(t => t.Start());
    }

    //     
    public void Close() {
        persons.ForEach(t => t.Kill());
    }

   
}

class Eater : ThreadObj {
    public string name { get; private set; } //      
    public int speed { get; private set; } //         
    private System.Random rand;
    private DiningHall dh;

    public Eater(string name, int speed, int seed, DiningHall dh) : base(name) {
        this.name = name;
        this.speed = speed;
        this.rand = new System.Random(seed);
        this.dh = dh;
        this.run = Run;
    }

    //           ~~
    private void Run(ThreadObj env) {
        try {
            while (env.alive) {
                //     
                Event.takeStart(this);
                var cake = dh.takeCake();

                //      ,     
                while (cake.percent > 0) {
                    cake.eat(rand.Next(speed)); //        
                    Event.eatStart(this, cake);
                    Thread.Sleep(100); //     ,     
                }
                Debug.LogWarning("[Eater] " + cake.ToString());
                //      
                Event.eatEnd(this);
            }
        } catch (ThreadInterruptedException ex) {
        } finally {
        }
    }

    //     ,  UI     
    public class Event {
        public int id { get; private set; }
        public int type { get; private set; }
        public Cake cake { get; private set; }

        public static void takeStart(Eater eater) {
            eater.dh.send2UI(new Event() { type = 0 });
        }

        public static void eatStart(Eater eater, Cake _cake) {
            eater.dh.send2UI(new Event() { type = 1, cake = _cake.clone() });
        }

        public static void eatEnd(Eater eater) {
            eater.dh.send2UI(new Event() { type = 2 });
        }

        public override string ToString() {
            return string.Format("[Eater] type: {0}, cake: {1}", type, cake == null ? "-" : cake.ToString());
        }
    }
}

//   
public class Maker : ThreadObj {
    public string name { get; private set; } //      
    public float speed { get; private set; } //          
    private System.Random rand;
    private DiningHall dh;

    public Maker(string name, float speed, int seed, DiningHall dh) : base(name) {
        this.name = name;
        this.speed = speed;
        this.rand = new System.Random(seed);
        this.dh = dh;
        this.run = Run;
    }

    //      ~~
    private void Run(ThreadObj env) {
        try {
            while (env.alive) {
                // 0.         
                Event.waitMachine(this);
                var machine = dh.kitchen.getMachine(); //      

                // 1.       ,      
                //dh.send2UI(Event.makeStart(cake)); //     
                var type = rand.Next(5);
                var amount = rand.Next(50, 200);
                var cake = new Cake(type, amount);
                machine.Cook(cake); //      ~~

                Event.waitCooking(this, cake);
                cake = machine.GetCake(); //     

                // 2.     
                Event.putCake(this);
                dh.table.put(cake);

                Event.putCakeEnd(this);
            }
        } catch (ThreadInterruptedException ex) {
        } finally {
        }
    }

    public class Event {
        public int type { get; private set; }
        public Cake cake { get; private set; }

        public static void waitMachine(Maker m) {
            m.dh.send2UI(new Event() { type = 0 });
        }

        public static void waitCooking(Maker m, Cake c) { //   
            m.dh.send2UI(new Event() { type = 1, cake = c.clone() });
        }

        public static void putCake(Maker m) {
            m.dh.send2UI(new Event() { type = 2 });
        }

        public static void putCakeEnd(Maker m) {
            m.dh.send2UI(new Event() { type = 3 });
        }

        public override string ToString() {
            return string.Format("[Maker] type: {0}, cake: {1}", type, cake==null ? "-" : cake.ToString());
        }
    }
}

//   
public class Cake {
    static int ID = 0;
    private int _id = 0;
    public int type { get; private set; } //     
    public float totalAmount { get; private set; } //        
    public float curAmount { get; private set; } //         
    private int status;  // 0      , 1      ; -1        (  UI  )
    
    public float percent { get { return curAmount / (float)totalAmount; } }

    public Cake(int type, float totalAmount, int id=-1, float curAmount =0, int status = 0) {
        this.type = type;
        this.totalAmount = totalAmount;
        this.curAmount = curAmount;
        this.status = status;
        _id = id < 0 ? ++ID : id;
    }

    public int id {
        get {
            return _id;
        }
    }

    //     
    public void make(float amount) {
        if (status == 0) {
            curAmount += amount;
            if (curAmount >= totalAmount) {
                curAmount = totalAmount;
                status = 1;
            }
        }
    }

    //     
    public void eat(int amount) {
        if (status == 1) {
            curAmount -= amount;
            if (curAmount <= 0) {
                curAmount = 0;
                status = -1;
            }
        }
    }

    public Cake clone() {
        return new Cake(type, totalAmount, id, curAmount, status) ;
    }

    public override string ToString() {
        return string.Format("{{ id: {0}, type: {1}, total: {2:0.0}, cur: {3:0.0}, status: {4}, percent: {5:0.0} }}", 
            id, type, totalAmount, curAmount, status, percent);
    }
}


//    
public class CakeMachine {
    public Kitchen kitchen { get; private set; }
    private Cake curCake = null;
    private bool isCooking = false; 

    public CakeMachine(string name, Kitchen kitchen)  {
        this.kitchen = kitchen;
    }

    private void Run() {
        if (curCake != null) {
            while (curCake.percent < 1) {
                curCake.make(10);
                Thread.Sleep(100); //     
            }
            Debug.LogError("Done!" + curCake.id );
            MarkAsDone();
        } 
    }

    //      
    public void Cook(Cake cake) {
        lock (this) {
            while (isCooking) {
                Monitor.Wait(this);
            }
            curCake = cake;
            isCooking = true;
            Debug.LogWarning("[Cook] " + curCake.ToString());
            var thread = new Thread(Run);
            thread.IsBackground = true;
            thread.Start();
        }
    }

    //     
    public Cake GetCake() {
        lock(this) {
            while (isCooking) {
                Monitor.Wait(this);
            }
            return curCake.clone();
        }
    }

    //     
    public void ShutDown() {
        lock (this) {
            //Kill();
            MarkAsDone();
        }
    }

    //      
    public void MarkAsDone() {
        lock (this) {
            isCooking = false;
            Monitor.PulseAll(this);
            kitchen.MachineReady();
        }
    }

    public bool isDone {
        get {
            lock (this) {
                return !isCooking;
            }
        }
    }
}

//   ,       
public class Kitchen {
    private List machines;  //    
    public Kitchen(int machineNum) {
        machines = new List();
        for (int i = 0; i < machineNum; i++) {
            machines.Add(new CakeMachine("M" + i, this));
        }
    }

    public CakeMachine getMachine() { //            
        lock (this) {
            CakeMachine free = null;
            while (free == null) {
                free = machines.Find(m => m.isDone);
                if (free == null) { //         ,    
                    Monitor.Wait(this);
                }
            }
            return free;
        }
    }

    public void MachineReady() {  //        
        lock (this) {
            Monitor.PulseAll(this);
        }
    }
}

//     ,         !!       
public class Table {
    private Cake[] buffer;
    private int tail;
    private int head;
    private int count;

    public Table(int _count) {
        this.buffer = new Cake[_count];
        tail = head = count = 0;
    }

    public void put(Cake cake) {
        lock (this) {
            while (count >= buffer.Length) {
                Monitor.Wait(this);
            }
            buffer[tail] = cake;
            tail = (tail + 1) % buffer.Length;
            count++;
            Monitor.Pulse(this);
        }
    }

    public Cake take() {
        lock (this) {
            while (count <= 0) {
                Monitor.Wait(this);
            }

            var cake = buffer[head];
            head = (head + 1) % buffer.Length;
            count--;
            Monitor.Pulse(this);
            return cake;
        }
    }
}