ゼロからノードを使用して奔流アプリケーションを作成します.


開発のものを学ぶ最良の方法は、行って、それが何であれ、あなた自身のものをつくろうとします.この記事では、ノードjsとswenssonp 2 pライブラリを使用して、Torrentアプリケーションの最小限の例を作成することによって、あなたを歩き回ります.
これを理解するために読んで、コメントすることは、強く勧められます.
だから、Torrentはピアの交換ファイルをp 2 pネットワークです.その背後にある主なアイデアは、1つのファイルが別のピアで表示される可能性があります、チャンキングと分割のダウンロードストリームでは、ピアはファイルのダウンロードをスピードアップすることができます.P 2 Pネットワークはファイルに関するメタ情報を交換するために使用されますが、実際のダウンロードは直接シードに別々のTCP接続を使用します.
この記事ではleechesを実装しませんが、このコードは後にrepoにあります.
さて、まず最初に、私はこのアプリケーションを使用してファイルを共有するエンドユーザーのためのいくつかの種類のインターフェイスを考え出す必要があります.私は、内部のすべてをちょうどインデックスすることに決めましたprocess.cwd() アプリケーションの起動時.
ファイルを保存するには、ファイルのハッシュがキーになるマップを使用することを決めた.また、私は、このプロセスがユーザが彼らがしたいことをするのをブロックするのを望まないと決めました、そして、私は待ちに待たれていないasync機能の中でインデックスを付けました.hashFile 実装はあなた次第です.
const path = require('path');
const { readdir, stat } = require('fs/promises');

const index = new Map();

async function* findFiles (folder) {
  for (let filename of await readdir(folder)) {
    const filepath = path.resolve(folder, filename);
    const filestats = await stat(filepath);

    if (filestats.isDirectory()) {
      yield* findFiles(filepath);
    } else {
      yield { path: filepath, size: filestats.size };
    }
  }
}

;(async () => {
  console.log('Start indexing files...');

  for await (let { path, size } of findFiles(process.cwd())) {
    const [name] = path.split('/').slice(-1);
    const hash = await hashFile(path);

    index.set(hash, { hash, size, name, path });
  }

  console.log(`Directory content indexed, ${index.size} files found`);
})();
次にやりたいことはP 2 Pネットワークを作ることです.私の使用swenssonp2p ジャストコールcreateNode . それはローカルの一般的なP 2 Pネットワークノードを設定するlisten 接続を受け入れ始める.
私はスタートアップの後に何をしているかを正確に知りません、私は多くのものがなければならないと思います、それで、私はイベントエミッタ呼び出し(ソケット)を残します.listen callbackの前に購読できるようにするには、すべての同期コードが完了するまで、listen呼び出しを遅らせます.
const EventEmitter = require('events');
const createNode = require('swenssonp2p');

const main = new EventEmitter();

const node = createNode();
const port = Number(process.argv[2]);

setTimeout(() => {
  node.listen(port, () => main.emit('startup', port));
}, 0);
ノードが上がった後に、私は進んで、ユーザーに彼らが何をすることができるかについて知らせます.私は同じインターフェイスを使用していますchat application , しかし、私は、私がどんなコマンドを持っているかを正確に知りません.
main.on('startup', (port) => {
  console.log(`Node is up on ${port}.`);
  console.log('');

  main.emit('help');

  process.stdin.on('data', (data) => main.emit('command', data.toString()));
});
最初のコマンドは、チャットアプリケーションだけでなく、connect コマンド.
main.on('help', () => {
  console.log('  write "connect IP:PORT" to connect to other nodes on the network.');
});

main.on('command', (text) => {
  if (text.startsWith('connect')) {
    const ipport = text.substr(8);
    const [ip, port] = ipport.split(':');

    console.log(`Connecting to ${ip} at ${Number(port)}...`);
    node.connect(ip, Number(port), () => {
      console.log(`Connection to ${ip} established.`);
    });
  }
});
今、私はユーザーが最初にファイルを検索できるようにしたい.私は名前で検索を実装するだけです、しかし、あなたは同様にこのコマンドで他のパラメタを加えることができます.また、インデックスは私たちがファイルを探すのを助けません、しかし、我々は後でそれを使います、私は約束します.
main.on('help', () => {
  console.log('  write "search FILENAME" to look for files.');
});

// Once the command arrives, we broadcast the search message on the network
main.on('command', (text) => {
  if (text.startsWith('search')) {
    const searchRequest = text.substr(7).trim();

    console.log(`Searching for file by "${searchRequest}"...`);
    node.broadcast({ type: 'search', meta: searchRequest });
  }
});

// Once we receive this message (on another node), we reply with results
node.on('broadcast', ({ origin, message: { type, meta }}) => {
  if (type === 'search' && origin !== node.id) {
    for (let key of index.keys()) {
      const data = index.get(key);

      if (data.name.toLowerCase().includes(meta.toLowerCase())) {
        node.direct(origin, { type: 'search/response', meta: data });
      }
    }
  }
});

// Once we receive the response from the file holder, we display it
node.on('direct', ({ origin, message: { type, meta: { name, size, hash } }}) => {
  if (type === 'search/response') {
    console.log(`  ${name} ${formatSize(size)} ${hash}`);
  }
});
この卓球形式の流れは実装するのが簡単ですが、理論上、私たちが受け取ることができるように不安定に感じますsearch/response を返します.ログ.私はこの問題を考慮しません、しかし、ここの安全チェックは傷つきません.
次に私がしたいことは、ユーザーがダウンロードを開始できるようにすることです.ハッシュはインデックスに使用されるので、コマンドのparamとして使用できます(これは、ファイルハッシュを持つマグネットリンクを作成することができますし、アプリケーションを検索せずにダウンロードするよう要求します).
私は、今すぐダウンロードを開始するときに何をするかわからないので、私はそこにソケットを残します.
main.on('help', () => {
  console.log('  write "download HASH" to start downloading file');
});

main.on('command', (text) => {
  if (text.startsWith('download')) {
    const hash = text.substr(9).trim();

    main.emit('download', hash);
  }
});
ファイルをダウンロードするために、私たちは、彼らからのピアと要求チャンクに別々のTCP接続を確立しなければなりません.チャンクとファイル名の量は、検索コマンドで受信したかもしれませんが、ローカルに持っている情報ではありません.まず最初に、ダウンロードを開始する前に、ファイルのメタ情報を交換するピンポンフローを設定します.これは検索フローと同じですが、最終的にはdownloads そして、彼らが変わるならば、イベントを放出してください.
ご覧のように、交換情報もシードのIPアドレスを含んでいますので、後でダウンロードしながらファイルサーバーに接続できます.
const downloads = {};

main.on('download', (hash) => {
  node.broadcast({ type: 'download', meta: hash });
});

node.on('broadcast', ({ origin, message: { type, meta } }) => {
  if (type === 'download' && origin !== node.id) {
    const data = index.get(meta);

    if (!!data) {
      node.direct(origin, { type: 'download/response', meta: { ip: Array.from(node.addresses)[0], hash: data.hash, size: data.size, name: data.name } })
    }
  }
});

node.on('direct', ({ origin, message: { type, meta } }) => {
  if (type === 'download/response') {
    if (!downloads[meta.hash]) {
      downloads[meta.hash] = {
        hash,
        name: meta.name,
        size: meta.size,
        seeds: [meta.ip],
        chunks: [],
      };

      main.emit('download/ready', meta.hash);
    } else {
      downloads[meta.hash].seeds.push(meta.ip);
      main.emit('download/update', meta.hash);
    }
  }
});
さて、今では、ファイルのデータ要求に応答し、データを送信するTCPサーバーを作成する時間です.私たちはデータをチャンクに交換するので、ファイルサーバは1つの特定のタイプのメッセージに反応して、1つのタイプのメッセージを返す必要があるだけです.
const FILES_SERVER_PORT = 9019;
const CHUNK_SIZE = 512;

const filesServer = net.createServer((socket) => {
  socket.on('data', (data) => {
    const { hash, offset } = JSON.parse(data);
    const meta = index.get(hash);

    const chunk = Buffer.alloc(CHUNK_SIZE);
    const file = await open(meta.path, 'r');

    await file.read(chunk, 0, CHUNK_SIZE, offset * CHUNK_SIZE);
    await file.close();

    socket.write(JSON.stringify({ hash, offset, chunk }));
  });
}).listen(FILES_SERVER_PORT);
さて、実際のダウンロードを実装する時です.〜に反応して始めようdownload/ready イベントと非同期ループを作成すると、並列にシードからチャンクを取得し、1つのシードを一度に1つのチャンクを取得します.
どのような状態が何チャンクを追跡するために、私はchunks フィールドの状態とソケットのデータをダウンロードしてからデータを使用している.
main.on('download/ready', async (hash) => {
  downloads[hash].chunks = [...new Array(Math.ceil(downloads[hash].size / CHUNK_SIZE))].map(() => ({ state: 0 }));
});
それに加えて、一時的なファイルをダウンロードしておく必要があります.
downloads[hash].path = path.resolve(DOWNLOADS_PATH, `${hash}.download`);

const file = await open(downloads[hash].path, 'w');
今、私はIPアドレスに接続する必要がありますdownloads 私は一度知っているdownload/ready イベントは引き出されます、すでに若干のものがあります、しかし、私も反応しなければなりませんdownload/update イベントをリストを更新します.私はこのイベントにリスナーを添付し、ダウンロードが完了するとそれを分離します.
const sockets = {};

const updateSocketsList = async ($hash) => {
  if ($hash !== hash) {
    return;
  }

  for (let ip of downloads[hash].seeds) {
    if (!sockets[ip]) {
      const socket = new net.Socket();

      socket.connect(FILES_SERVER_PORT, ip, () => {
        sockets[ip] = { socket, busy: false };
      });
    }
  }
};

updateSocketsList(hash);

main.on('download/update', updateSocketsList);

// ... TODO

main.off('download/update', updateSocketsList);
メインサイクルはかなりシンプルで、チャンク状態を探しています0 準備完了.1 がダウンロードされ、2 をダウンロードしてソケットには、それはビジー状態ではありません.ソケットがない場合(それらのすべてがビジー状態である)またはチャンク(それらがすべてダウンロードされていることを意味する)がなければ、私はちょうどcontinue 50 ms遅延後.両方の利用できるチャンクとソケットが提示されるならば、私はダウンロードするが、このダウンロードを終えるのを待ちません.
while (!!downloads[hash].chunks.find((chunk) => chunk.state !== 2)) {
  const availableChunkIndex = downloads[hash].chunks.findIndex((chunk) => chunk.state === 0);
  const availableSocket = Object.values(sockets).find(({ busy }) => !busy);

  if (!availableSocket || !availableChunkIndex) {
    await new Promise((resolve) => setTimeout(() => resolve(), 50));
    continue;
  }

  availableSocket.busy = true;
  downloads[hash].chunks[availableChunkIndex].state = 1;

  ;(async () => {
    const chunk = await downloadChunk(availableSocket.socket, hash, availableChunkIndex);

    await file.write(Buffer.from(chunk), 0, CHUNK_SIZE, availableChunkIndex * CHUNK_SIZE);

    downloads[hash].chunks[availableChunkIndex].state = 2;
    availableSocket.busy = false;
  })();
}
ご覧のように、私はdownloadChunk ソケットからデータを取得する関数.ソケットはイベントエミッタであり、以下のようにする必要があります.
const downloadChunk = (socket, hash, offset) => new Promise((resolve) => {
  socket.write(JSON.stringify({ hash, offset }));

  const listener = (message) => {
    if (hash === message.hash && offset === message.offset) {
      resolve(message.chunk);
      socket.off('data', listener);
    }
  };

  socket.on('data', listener);
});
今、私はファイルハンドルを閉じることによってクリーンアップする必要があるだけで、必要なファイル名にテンポラリファイルをリネームし、リスナーをdownload/update シードソケットを閉じます.
await file.close();
await rename(downloads[hash].path, path.resolve(DOWNLOADS_PATH, downloads[hash].name));

main.off('download/update', updateSocketsList);

for (let { socket } of Object.values(sockets)) {
  socket.destroy();
}
これはどのように以下のノードを使用してコードの以下の300行で最も簡単な奔流アプリケーションを作ることができますswenssonp2p . このアプリの完全なコードを見つけることができますhere .