Nodejsでデータの叙事詩量を処理する方法


序文


あなたが私のような場合は、HTTPリクエストを扱うように、さまざまなものの多くのためにNODEJSを使用するように、CLIツール、IOTなどを作るように.あなたは、あなたが同時に複数のHTTPリクエストを送信する必要がある状況には、おそらく実行されている場合は、していない場合は心配しないでください1日はあなたがするので.JavaScriptの非同期の性質を使用して、ほとんどの経験豊富な開発者は、同時にHTTPのリクエストのカップルを送信することができます.しかし、何が何百万のHTTPリクエストを送信する必要が発生しますか?この問題は、大部分の人々が大きなデータを処理しているあまりに頻繁に対処する必要がないという問題に触れるので、味付けされたJavaScript開発者さえ旅行しそうです.
あなたが非同期的に100万のHTTPリクエストを送信しようとした場合、あなたのプログラムはクラッシュし、推測は正しいだろうと推測している可能性があります.実際、あなたのプログラムはたぶん100万のHTTPリクエストの前にクラッシュするでしょう.何かが非同期であるので、それがデータの無限の量を扱うことができるというわけではありません.この記事の残りの部分では、システムリソースを使い果たすことのない効率的な方法で任意のサイズのデータを扱う方法を示します.私たちはNodeJS Streams それは私たちの秘密のソースですので、あなたがストリームへのガイドが必要な場合this is my favorite article . その記事とは異なり、どのようにストリームを高レベルを除いて動作するようにダイビングを計画していない代わりに、私の目標は、ストリームを使用して大きなデータを処理する実用的な例を与えることです.

完成したコードにまっすぐに


あなたが急いでいるか、読むのを気にしないならば、ここで我々が構築されるものの完成したGithub倉庫です.

イタイムIII / NODEJS叙事詩データ



NODEJSのデータの叙事詩量を処理する方法


View on GitHub

何をビルドする

  • 私たちはファイルからgithubユーザ名のリストを読んでいます
  • それぞれのGithubのユーザ名でgithub APIを呼び出し、リポジトリの一覧を取得します.私たちは、GithubのAPIを詰め込んでいる読者の束が欲しくないので、12のユーザー名のより小さなリストで働くだけです、そして、概念がどんなデータのためにでも同じことであるので.
  • このデータをデータベースに書き込みますが、この手順のセットアップの複雑さを避けるために、ファイルにデータを書きます.
  • 最後に、我々は物事をよりパフォーマンスを作るためにリファクタリングされます.
  • 私たちはすべてのNODEJSストリームを使用して、これは正しく行われた場合の概念を行っているbackpressuring これは、メモリの不足しない方法でNodeJSリソースを使用するのに役立ちます.

    1 . githubのユーザ名の読み込み


    このファイルは、repository
    src/mainjs
    const fs = require('fs')
    const path = require('path')
    const CsvParser = require('csv-parse')
    
    const readGithubUsernamesStream = fs.createReadStream(path.resolve(__dirname, '../github-usernames.csv'))
    const csvParser = new CsvParser({})
    
    let githubUsernames = ''
    readGithubUsernamesStream
      .pipe(csvParser)
      .on('data', (data) => githubUsernames += data)
      .on('end', () => console.log(githubUsernames))
    
    // Outputs - itmayziii,dhershman1,HetaRZinzuvadia,joeswislocki,justinvoelkel,mandarm2593,mfrost503,wmontgomery,kentcdodds,gaearon,btholt,paulirish,ryanflorence
    
    

    2 . githubからのリポジトリの一覧を取得する


    NodeJSが私たちに提供してくれたのは良かったcreateReadStream ストリームとしてファイルを読むには、現在のユーザー名のリストを取るために独自のストリームが必要になります.
    このステップについてはaxios HTTPリクエストを作成する
    ユーザ名をgithub reposに変換します.js
    const axios = require('axios')
    const stream = require('stream')
    
    module.exports = class TransformUsernameToGithubRepos extends stream.Transform {
      constructor (options = {}) {
        super({ ...options, objectMode: true })
      }
    
      _transform (chunk, encoding, callback) {
        const username = chunk
        this.getGithubRepositoriesForUser(username)
          .then((response) => {
            let repositories = []
            if (response.data) {
              repositories = response.data.map((repository) => repository.name)
            }
    
            this.push(JSON.stringify({
              username,
              repositories
            }))
            callback()
          })
          .catch(callback)
      }
    
      getGithubRepositoriesForUser (username) {
        return axios.get(`https://api.github.com/users/${username}/repos`, {
          headers: {
            Authorization: `Token ${process.env.GITHUB_ACCESS_TOKEN}`
          }
        })
      }
    }
    
    
    src/mainを変更します.js
    const fs = require('fs')
    const path = require('path')
    const CsvParser = require('csv-parse')
    const TransformUsernameToGithubRepos = require('./transform-username-to-github-repos')
    
    const readGithubUsernamesStream = fs.createReadStream(path.resolve(__dirname, '../github-usernames.csv'))
    const csvParser = new CsvParser({ columns: false })
    const transformUsernameToGithubRepos = new TransformUsernameToGithubRepos()
    
    let githubUserRepositories = []
    readGithubUsernamesStream
      .pipe(csvParser)
      .pipe(transformUsernameToGithubRepos)
      .on('data', (data) => githubUserRepositories.push(data))
      .on('end', () => console.log(githubUserRepositories))
    
    
    私たちはそこでたくさんのことを変えました.我々は、Aを持っている変換ストリームを作成しました_transform 方法.この変換ストリームにCSVファイルを送るとき_transform メソッドが呼び出されます.一度_tranform ユーザ名が渡されたメソッドが呼び出され、ユーザ名を受け取り、そのユーザのリポジトリに対してGithubに問い合わせます.私たちは結果をストリームの次のものに送りますthis.push(...) . 我々は蒸気パイプラインの次のステップをまだ持っていないので、我々はdata イベントは、我々はデータを収集し、メインのログ.js

    3 .ユーザリポジトリをファイルに書き込む


    src/mainjs
    const fs = require('fs')
    const path = require('path')
    const CsvParser = require('csv-parse')
    const TransformUsernameToGithubRepos = require('./transform-username-to-github-repos')
    
    const readGithubUsernamesStream = fs.createReadStream(path.resolve(__dirname, '../github-usernames.csv'))
    const csvParser = new CsvParser({ columns: false })
    const transformUsernameToGithubRepos = new TransformUsernameToGithubRepos()
    const writeStream = fs.createWriteStream(path.resolve(__dirname, '../github-user-repositories.txt'))
    
    let githubUserRepositories = []
    readGithubUsernamesStream
      .pipe(csvParser)
      .pipe(transformUsernameToGithubRepos)
      .pipe(writeStream)
      .on('end', () => process.exit())
    
    
    これは簡単なステップであり、TXTファイルに書き込みストリームの書き込みを作成しました.

    リファクタリング


    私たちが働いているが、それは理想から遠く離れている.あなたがコードを見るならば、それはひどく非能率的です.
  • それは一度に1つのHTTPリクエストを行うだけで、我々は100万のHTTPリクエストを同時に行うことができないので、我々は多分100を行うことはできません意味しません.この例では、デモンストレーション目的のためにパイプラインウォークスルーごとに5に制限します.
  • コードにもエラーがない
  • 前に進んで、これらのことを修正します
    ユーザ名をgithub reposに変換します.js
    const axios = require('axios')
    const stream = require('stream')
    
    module.exports = class TransformUsernameToGithubRepos extends stream.Transform {
      constructor (options = {}) {
        super({ ...options, objectMode: true })
        this.requests = []
      }
    
      _transform (chunk, encoding, callback) {
        const username = chunk[0]
        const githubRequest = this.getGithubRepositoriesForUser(username)
        this.requests.push(this.prepareGithubRequest(username, githubRequest))
        if (this.requests.length < 5) {
          return callback()
        }
    
        this.processRequests(callback)
      }
    
      _flush (callback) {
        this.processRequests(callback)
      }
    
      getGithubRepositoriesForUser (username) {
        return axios.get(`https://api.github.com/users/${username}/repos`, {
          headers: {
            Authorization: `Token ${process.env.GITHUB_ACCESS_TOKEN}`
          }
        })
      }
    
      prepareGithubRequest (username, githubRequest) {
        return githubRequest
          .then((response) => {
            let repositories = []
            if (response.data) {
              repositories = response.data.map((repository) => repository.name)
            }
    
            return {
              username,
              repositories
            }
          })
      }
    
      processRequests (callback) {
        return Promise.all(this.requests)
          .then((responses) => {
            this.requests = []
    
            this.push(responses.reduce((accumulator, currentValue) => {
              return accumulator + JSON.stringify(currentValue)
            }, ''))
            callback()
          })
          .catch(callback)
      }
    }
    
    再び私たちはたくさんやったので、何が起こったupackましょう.我々は、我々を変えました_tranform Github APIを呼び出して、その約束を配列に押しつぶす方法.基本的に、私たちは、トランスフォーメーションがメソッドで見つかるストリームを通してデータをプッシュするように言う前に、Githubを5回呼び出していますprocessRequests . 我々は、1つの代わりに5つの要求を処理するためにパイプラインを変えました.
    我々が100万のレコードを処理していて、我々の番号として5の代わりに我々が100を使用したならば、我々が想像することができたように、我々は同じ時間の近くで100のHTTP要求を送って、彼らを100以上を送る前にすべてを解決するのを待ちます.これは、大量のデータを処理する効率的/リソースを節約する方法です.
    しかし、我々はまだより良いエラー処理をする必要がありますpipeline 関数.

    pipeline - A module method to pipe between streams forwarding errors and properly cleaning up and provide a callback when the pipeline is complete.


    src/mainjs
    const fs = require('fs')
    const path = require('path')
    const CsvParser = require('csv-parse')
    const TransformUsernameToGithubRepos = require('./transform-username-to-github-repos')
    const stream = require('stream')
    
    const readGithubUsernamesStream = fs.createReadStream(path.resolve(__dirname, '../github-usernames.csv'))
    const csvParser = new CsvParser({ columns: false })
    const transformUsernameToGithubRepos = new TransformUsernameToGithubRepos()
    const writeStream = fs.createWriteStream(path.resolve(__dirname, '../github-user-repositories.txt'))
    
    stream.pipeline(
      readGithubUsernamesStream,
      csvParser,
      transformUsernameToGithubRepos,
      writeStream,
      (error) => {
        if (error) {
          console.error('error ', error)
          return process.exit(1)
        }
    
        process.exit()
      }
    )
    

    結論


    NODEJSストリームは、データが1ポイントで開始し、最後まで流れるパイプラインを有効にすることができます.単にNodeJSの既に構築されたストリームを実装することから来るバックプレッシャーを使用して、我々は効率的にデータの非常に大きなセットを処理している間、コンピュータの資源を使用します.私はこの仕事のような方法を知っているので、CSVから1000万以上のレコードを処理し、APIを呼び出して追加のデータを取得し、結果をデータベースに保存します.ストリームは、自分自身で効果的ですが、本当に物事をスピードアップしたい場合はchild processes 最大効率のためのストリーム.
    カバー写真信用unsplash