【ノード.js学習ノート】async/awaitグローバル制御同時数量

16836 ワード

前言
会社の業務はNodeを使う必要がある.jsはRedisタスクを消費するサービスを開発したが、nodeに慣れていないからだ.jsにおける非同期処理は,多くの回り道をしたが,同時にいくつかの関連知識を学んだ.最初のデザインの考え方は、Nodeを使っているからです.jsが書いた最初のビジネスコードなので、考え方が未熟です.
疑似コードは次のとおりです.
async consume() {
    try {
        let task = await this.getTask()
        if (task) {
        	...
            await this.handleTask(task)
            return true
        }
    } catch (error) {
        console.trace(error)
        return false
    }
    return false
}

async handleTask(task) {
	//...
	for(let i = 0;i<len;i++){
		await this.doSomething(taskData[i])
	}
	//...
}

async doSomething() {
	//          
	//...
}

思い出してみろjsは単一スレッドに属しており,この最初のコードではネットワーク関連の操作に時間がかかるが,Redisからタスクを受け取った後awaitで初歩的な消費を行うだけでなく,awaitで元の非同期のネットワーク要求操作を待つ.そのため、サービス全体が単一スレッド、単一タスクのみを走り、ネットワークリクエストが完了した後に消費を順番に待つようになった.基本テスト10 W+タスクでは,約1 Kリクエストのみで数分間消費した.この単機消費能力は耐えられない.そして、以下のように修正しました.
疑似コードは次のとおりです.
async consume() {
    try {
        let task = await this.getTask()
        if (task) {
        	...
            await this.handleTask(task)
            return true
        }
    } catch (error) {
        console.trace(error)
        return false
    }
    return false
}

async handleTask(task) {
	//...
	for(let i = 0;i<len;i++){
		this.doSomething(taskData[i]).catch(()=>{
			//handle some error
		})
	}
	//...
}

async doSomething() {
	//          
	//...
}

余談
        ,Redis       ,            ,    5k    3w。     ,          ,       ,               API  ,       ,    。        。
  ,           ,        , handleTask doSomething  ,             ,         ,     doSomething          。

上記のコードは、ネットワークリクエスト前のawaitを削除しただけで、結果やエラー処理を待つ必要があるいくつかの操作をcallbackまたはグローバル変数で実現していることが明らかです.テストが始まると、スピードが速いことに気づきました.数十万の任務量が入ってきて、20秒ぐらいで消費が完成したことを示しています.しかし、その後、httpネットワークリクエストのタイムアウトの異常が山積した(インタフェース側はリクエストを受け付け、たまたますべて消費に成功したが、ネットワークIOなどの問題でローカルHTTP呼び出しはすべてタイムアウトを表示した).これは、タスク処理ステータスの追跡に非常に不利です.もちろん,コード中の比較的時間のかかるネットワーク操作を直接非同期処理したためであることは明らかである.走っている間に、本機のCPUをほぼ満タンにしてしまいました.このような操作も非常に頼りにならないので、これを制限しなければなりません.そうしないと、不確実性が高すぎます.その後、前のWindows開発の時、スレッドプールなどのスレッド同時制限があったことを思い出します.しかし、たぶん検索しました.Node.js対マルチスレッドという概念は主流の処理方式ではなくasync.を通じているようだ.mapLimit、async.queue、イベントなどで同時制御を行います.しかし、開発アーキテクチャが基本的に決まっているため、ネット上でいくつかの参考方法を探したところ、一定の局所性があるか、膨大で複雑であることが分かった.例えばasync.mapLimitはデータを配列に構成し,反復消費を必要とし,データの再編成が面倒であるほか,グローバルな同時数制御が困難である.最後に、ネット上の資料を参考にしました.グローバル同時制御は次のように行われた.
疑似コードは次のとおりです.
const MAX_CONCURRENCY = 20 //     
let lock = [] //   
let currentConcurrency = 0 //     

async consume() {
    try {
        let task = await this.getTask()
        if (task) {
        	...
            await this.handleTask(task)
            return true
        }
    } catch (error) {
        console.trace(error)
        return false
    }
    return false
}

async handleTask(task) {
	//...
	for(let i = 0;i<len;i++){
		//       ,  await     , promise  resolve    ,      
		if (this.currentConcurrency >= MAX_CONCURRENCY) {
           let _resolve
           await new Promise((resolve,reject)=> {
               _resolve = resolve
               this.lock.push(_resolve)
           })
        }
        
        this.currentConcurrency++ //              ,      
		this.doSomething(taskData[i]).catch((error)=>{
			//handle some error
			this.currentConcurrency-- //        ,     
			this.lock.length && this.lock.shift()() //  lock          ,       
		})
	}
	//...
}

async doSomething() {
	//          
	//...
	this.currentConcurrency-- //        ,     
	this.lock.length && this.lock.shift()()  //  lock          ,       
}

ここでは、グローバル変数により現在の制御実行のコンカレント数を記録し、制御が必要なコンカレントドメインを実行する前に、現在許容されている最大コンカレント数を超えているか否かを判断し、最大コンカレント数より大きい場合はawaitでブロックし、promiseのresolveをキューに押し込み、起動実行を待つが、1つのコンカレントタスクが完了すると、コンカレント数を更新し、awaitでブロックしてキューに押し込んだresolveを取り出してトリガを実行し、起動作用を行い、グローバルで実行されるタスクを継続します.上の同時制御ブロック部分のコード、同時タスクが完了した後に処理を更新し、次のタスクを起動し続けるコード.この2つの部分は、それぞれ関数にカプセル化され、スレッドプールのようなものを構成することができる.これは最良の解決策ではないかもしれませんが、現在の状況では、比較的直感的で、理解しやすく、使用しやすい案と言えるはずです.より優れたソリューションを提供することを歓迎します.