プログラマパス:スレッドプールをカスタマイズしてドキュメントのトランスコードを実現

64828 ワード

背景
我が社はずっと前に、あるずっと前の同僚が1つのドキュメントを書いてピクチャーのサービスを回転したことがあって、具体的な業務は以下の通りです:
ユーザーはクライアントでドキュメントをアップロードして、ppt、word、pdfなどのフォーマットで、ユーザーはクライアントでアップロードできるドキュメントをアップロードして完成して、プレビューの時に採用したのはピクチャーの形式です(私に別の方法でプレビューすると言わないで、今はもう間に合わない)ユーザがクラウドにドキュメントをアップロードすると(アリクラウド)、ドキュメントに関する情報をデータベースに記録し、トランスコード完了を待つ.
サーバーは1つのトランスコードサービス(実は1つのwindows service)がひっきりなしにトランスコード待ちのデータを訓練して、もしトランスコード待ちのデータがあれば、データベースから取り出して、それからドキュメントのネットアドレスによって現地にダウンロードしてトランスコード(複数の画像に変換)を行います
ドキュメントのトランスコードが完了すると、トランスコードされたピクチャをクラウドにアップロードし、クラウドピクチャの情報をデータベースに記録する.
クライアントにプレビューが必要な場合は、データベースに基づいてトランスコードに成功したか否かを判断し、成功した場合はデータを取得して表示する.
ドキュメントプレビューの全体的なプロセスは、上記のように、古いトランスコードサービスでは現在どのような問題がありますか?
1つのドキュメントは同時に1つのスレッドでしかトランスコード操作できないため、古いサービスはトランスコードされるデータをパイプに分割する思想を採用し、全部で6つのパイプがあり、データベースにマッピングすると大体Id="パイプIDという形になる.
プロファイル情報に基づいて、パイプがトランスコードされる文書を読み出し、単一スレッドでトランスコード操作を行うコンソールプログラム.
全部で6つのパイプがあるので、サーバーには6つのcmdの黒いウィンドウが...一部のドキュメントがフォーマットの問題やその他の問題でトランスコード中に引っかかる場合がありますが、具体的にはトランスコード操作が停止しています.
プログラムが詰まっている場合は、運転者がトランスコードcmdウィンドウを再起動する必要があります(このメンテナンスは卵が痛いです)その後、偶然にも、このプログラムのメンテナンスが落ちた野菜の頭の上で、1週間ほどメンテナンスして、約10回も再起動して、やっと我慢できなくて、もう一度やりましょう.よく分析した後、実際のドキュメントのトランスコードの核心操作のほかに、トランスコードプロセス全体には多くの注意点があります.
トランスコードサービスが詰まっていないことを保証する必要があり、以前と同じようにを再設計する必要はありません.
複数のプロセスを開くことをできるだけ避ける方法ですが、実際には、このビジネスシーンでは、複数のプロセスと複数のスレッドの役割が一致しています.
各ドキュメントは1回しかトランスコードできません.1つのドキュメントが複数回トランスコードされると、サーバリソースが浪費されるだけでなく、データが一致しない場合もあります.
トランスコードに失敗した文書は、1回の失敗が2回目の失敗を表すものではないため、失敗した文書が再び操作される機会を与えなければならない.
プログラムは絶えずドキュメントをトランスコードしてコストのピクチャーを使うため、だからこれらのファイルがトランスコードして完成してサーバーの上で削除することを保証する必要があって、さもなくば、時間が長くなって多くの役に立たないファイルを生成することができます
こんなにたくさん言ったのに、実は注意しなければならない点がたくさんあります.全体のトランスコードプロセスで言えば、本質的にはタスクプールの生産と消費の問題であり、タスクプールの中のタスクはトランスコード待ちのドキュメントであり、生産者はトランスコード待ちのドキュメントをタスクプールに捨て続け、消費者はタスクプールの中のドキュメントをトランスコードし続けて完成する.
スレッドプール
これは明らかにスレッドプールと似ていて、料理の前にスレッドプールの文章を書いたことがあります.興味のある学生は歴史をめくることができます.今日はこのスレッドプールでこのトランスコード問題を解決します.スレッドプールの本質は,一定数のスレッドを初期化し,タスクを継続的に実行することである.
 //      
    public class LXThreadPool:IDisposable
    {
        bool PoolEnable = true//  
        List ThreadContainer = null//
        ConcurrentQueue JobContainer = null//
        int _maxJobNumber; // job

        ConcurrentDictionary<string, DateTime> JobIdList = new ConcurrentDictionary<string, DateTime>(); //job , job 


        public LXThreadPool(int threadNumber,int maxJobNumber=1000)
        
{
            if(threadNumber<=0 || maxJobNumber <= 0)
            {
                throw new Exception(" ");
            }
            _maxJobNumber = maxJobNumber;
            ThreadContainer = new List(threadNumber);
            JobContainer = new ConcurrentQueue();
            for (int i = 0; i             {
                var t = new Thread(RunJob);
                t.Name = $" {i}";
                ThreadContainer.Add(t);
                t.Start();
            }
            //
            var tTimeOutJob = new Thread(CheckTimeOutJob);
            tTimeOutJob.Name = $" ";
            tTimeOutJob.Start();
        }

        // ,
        public int AddThread(int number=1)
        
{
            if(!PoolEnable || ThreadContainer==null || !ThreadContainer.Any() || JobContainer==null|| !JobContainer.Any())
            {
                return 0;
            }
            while (number <= 0)
            {
                var t = new Thread(RunJob);
                ThreadContainer.Add(t);
                t.Start();
                number -= number;
            }
            return ThreadContainer?.Count ?? 0;
        }

        // , 0:    1:
        public int AddTask(Action<object> job, object obj,string actionId, Action errorCallBack = null)
        
{
            if (JobContainer != null)
            {
                if(JobContainer.Count>= _maxJobNumber)
                {
                    return 0;
                }
                // 10
                var timeoOutJobList = JobIdList.Where(s => s.Value.AddMinutes(10)                 if(timeoOutJobList!=null&& timeoOutJobList.Any())
                {
                    foreach (var timeoutJob in timeoOutJobList)
                    {
                        JobIdList.TryRemove(timeoutJob.Key,out DateTime v);
                    }
                }

                if (!JobIdList.Any(s => s.Key == actionId))
                {
                    if(JobIdList.TryAdd(actionId, DateTime.Now))
                    {
                        JobContainer.Enqueue(new ActionData { Job = job, Data = obj, ActionId = actionId, ErrorCallBack = errorCallBack });
                        return 1;
                    }
                    else
                    {
                        return 101;
                    }
                }
                else
                {
                    return 100;
                }            
            }
            return 0;
        }  

        private void RunJob()
        
{
            while (JobContainer != null  && PoolEnable)
            {

                //
                ActionData job = null;
                JobContainer?.TryDequeue(out job);
                if (job == null)
                {
                    //
                    Thread.Sleep(20);
                    continue;
                }
                try
                {
                    //
                    job.Job.Invoke(job.Data);
                }
                catch (Exception error)
                {
                    //
                    if (job != null&& job.ErrorCallBack!=null)
                    {
                        job?.ErrorCallBack(error);
                    }

                }
                finally
                {
                    if (!JobIdList.TryRemove(job.ActionId,out DateTime v))
                    {

                    }
                }
            }
        }

        //
        public void Dispose()
        
{
            PoolEnable = false;
            JobContainer = null;
            if (ThreadContainer != null)
            {
                foreach (var t in ThreadContainer)
                {
                    // ,
                    t.Join();
                }
                ThreadContainer = null;
            }
        }

        //
        private void CheckTimeOutJob()
        
{
            // 10
            var timeoOutJobList = JobIdList.Where(s => s.Value.AddMinutes(10)             if (timeoOutJobList != null && timeoOutJobList.Any())
            {
                foreach (var timeoutJob in timeoOutJobList)
                {
                    JobIdList.TryRemove(timeoutJob.Key, out DateTime v);
                }
            }
            System.Threading.Thread.Sleep(60000);
        }
    }
    public class ActionData
    {
        // id,
        public string ActionId { getset; }
        //
        public object Data { getset; }
        //
        public Action<object> Job { getset; }
        //
        public Action ErrorCallBack { getset; }
    }

以上がスレッドプールの具体的な実装であり、具体的なビジネスとは関係なく、スレッドプールに適したシーンに完全に使用できます.その中に注意点があります.私は新しいタスクの表示を追加しました.主に重複したタスクが複数回投入されたことを排除するために使用されています(実行中のタスクのみを排除します).もちろんコードは最適ではありません.必要な学生は自分で最適化することができます.
スレッドプールの使用
次に、上記のスレッドプールを使用してドキュメントのトランスコードタスクを完了します.まず、起動時にスレッドプールを初期化し、独立したスレッドを起動してスレッドプールにタスクを送り続けます.ついでに、タスクを送信するスレッドを監視するモニタリングスレッドが発生しました.
       string lastResId = null;
        string lastErrorResId = null;

        Dictionary<stringint> ResErrNumber = new Dictionary<stringint>(); //
        int MaxErrNumber = 5;// 10
        Thread tPutJoj = null;
        LXThreadPool pool = new LXThreadPool(4,100);
        public void OnStart()
        
{
            //
            tPutJoj = new Thread(PutJob);
            tPutJoj.IsBackground = true;
            tPutJoj.Start();

            //  
            var tMonitor = new Thread(MonitorPutJob);
            tMonitor.IsBackground = true;
            tMonitor.Start();
        }
       // job
        private void MonitorPutJob()
        
{
            while (true)
            {
                if(tPutJoj == null|| !tPutJoj.IsAlive)
                {
                    Log.Error($" ==========");
                    tPutJoj = new Thread(PutJob);
                    tPutJoj.Start();
                    Log.Error($" ==========");
                }
                System.Threading.Thread.Sleep(5000);
            }

        }

        private void PutJob()
        
{           
            while (true)
            {
                try
                {
                    //
                    var fileList = DocResourceRegisterProxy.GetFileList(new int[] { (int)FileToImgStateEnum.Wait }, 30, lastResId);
                    Log.Error($" === :lastResId:{lastResId}, :{fileList?.Count() ?? 0}");
                    if (fileList == null || !fileList.Any())
                    {
                        lastResId = null;
                        Log.Error($" 0, , ==========");
                        // ,  
                        fileList = DocResourceRegisterProxy.GetFileList(new int[] { (int)FileToImgStateEnum.Error, (int)FileToImgStateEnum.TimeOut, (int)FileToImgStateEnum.Fail }, 1, lastErrorResId);
                        if (fileList == null || !fileList.Any())
                        {
                            lastErrorResId = null;
                        }
                        else
                        {
                            // Log.Error($" :{JsonConvert.SerializeObject(fileList)}");
                            List errFilter = new List();
                            foreach (var errRes in fileList)
                            {
                                if (ResErrNumber.TryGetValue(errRes.res_id, out int number))
                                {
                                    if (number > MaxErrNumber)
                                    {
                                        Log.Error($" :{errRes.res_id}  {MaxErrNumber} , ===========");
                                        continue;
                                    }
                                    else
                                    {
                                        errFilter.Add(errRes);
                                        ResErrNumber[errRes.res_id] = number + 1;
                                    }
                                }
                                else
                                {
                                    ResErrNumber.Add(errRes.res_id, 1);
                                    errFilter.Add(errRes);
                                }
                            }
                            fileList = errFilter;
                            if (fileList.Any())
                            {
                                lastErrorResId = fileList.Select(s => s.res_id).Max();
                            }
                        }
                    }
                    else
                    {
                        lastResId = fileList.Select(s => s.res_id).Max();
                    }

                    if (fileList != null && fileList.Any())
                    {
                        foreach (var file in fileList)
                        {
                            //   ,
                            int poolRet = 0;
                            while (poolRet <= 0)
                            {
                                poolRet = pool.AddTask(s => {
                                    AliFileService.ConvertToImg(file.res_id + $".{file.res_ext}", FileToImgFac.Instance(file.res_ext));
                                }, file, file.res_id);
                                if (poolRet <= 0 || poolRet > 1)
                                {
                                    Log.Error($" ========== :{poolRet}");
                                    System.Threading.Thread.Sleep(1000);
                                }
                            }
                        }
                    }
                    //
                    System.Threading.Thread.Sleep(3000);
                }
                catch
                {
                    continue;
                }

            }
        }

以上がリリースタスクです.スレッドプールはタスクを実行するすべてのコードです.具体的なトランスコードはプライバシーにかかわるため、ここでは提供されていません.必要があれば、プライベートで野菜を探して要求することができます.もっと優れた方法があることはよく知っていますが、スレッドプールのような考えは一部の人に役立つかもしれません.タスクタイムアウトのコアコードは次のとおりです(pollyプラグインが採用されています):
 var policy= Policy.Timeout(TimeSpan.FromSeconds(this.TimeOut), onTimeout(context, timespan, task) =>
                {
                    ret.State=Enum.FileToImgStateEnum.TimeOut;                   
                });
                policy.Execute(s=>{
                    .....
                });

あなたのより良い案を伝言エリアに書きましょう.2020年にはみんながますますよくなります.