Hadoop 2.1.0-cdh 4ファイルのソースコードの分析を書きます.

15474 ワード

本文はhadoopに書き込むjavaインタフェースがどのようにして実現されたのかを分析します.本文はnamenodeとdatanodeの部分を重点的に分析することはできません.クライアントの処理に専念します.
 
背景知識:簡単にファイルを書くclient-namenode-datanodeプロセスを紹介します.http://www.cnblogs.com/duguguiyu/archive/2009/02/22/1396034.html
 
まず、ロードFileSystemは、連続hadoopシステムでDisttributed FileSystemを作成します.DisttributedFileSystemのcreateからファイルを作成します.
@Override
  public HdfsDataOutputStream create(Path f, FsPermission permission,
    EnumSet<CreateFlag> cflags, int bufferSize, short replication, long blockSize,
    Progressable progress) throws IOException {
    statistics.incrementWriteOps(1);
    final DFSOutputStream out = dfs.create(getPathName(f), permission, cflags,
        replication, blockSize, progress, bufferSize);
    return new HdfsDataOutputStream(out, statistics);
  }
私達がjavaのhadoopプログラムを書く時、多く使われているのはhadoop-commonのFsData OutputStreamで、Hdfs Data OutputStreamはFsData OutputStreamを継承しています.このようにパッケージの利点は、FsData OutputStreamを参照するだけでhdfsを操作できるユーザーもいるということです.本当にコアなクラスはDFSOutputStreamです.このようなことを詳しく検討します.
 
お客様はまずFileSystem.reatを呼び出してFsDataOutputオブジェクトを作成します.実際のコアはDFSOutput Streamを作成しました.
/** Construct a new output stream for creating a file. */
  private DFSOutputStream(DFSClient dfsClient, String src, FsPermission masked,
      EnumSet<CreateFlag> flag, boolean createParent, short replication,
      long blockSize, Progressable progress, int buffersize,
      DataChecksum checksum) throws IOException {
    this(dfsClient, src, blockSize, progress, checksum, replication);
    this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK);

    computePacketChunkSize(dfsClient.getConf().writePacketSize,
        checksum.getBytesPerChecksum());

    try {
      dfsClient.namenode.create(
          src, masked, dfsClient.clientName, new EnumSetWritable<CreateFlag>(flag), createParent, replication, blockSize);
    } catch(RemoteException re) {
      throw re.unwrapRemoteException(AccessControlException.class,
                                     DSQuotaExceededException.class,
                                     FileAlreadyExistsException.class,
                                     FileNotFoundException.class,
                                     ParentNotDirectoryException.class,
                                     NSQuotaExceededException.class,
                                     SafeModeException.class,
                                     UnresolvedPathException.class);
    }
    streamer = new DataStreamer();
  }
1.dfs Cliennt.namenode.creatはnamenodeにcreateを要求して、ファイルを作成します.namenode端末は仮想ファイルシステムに対応するINodeを作成し、対応するleaseを作成します.
2.Data Streamerクラスを作成し、スレッドを起動します.DataStream類は送信データを処理するロジックを専門に担当しています.
 
DataStreamerを作成:
private DataStreamer() {
      isAppend = false;
      stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
    }
初期状態はPIPELINE_です.SETUP_CREATEは、namenodeにファイルを作成するように伝え、blockの書き込みを待つという意味です.
 
次にData Streamerのrun方法を見ます.
// get new block from namenode.
          if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
            if(DFSClient.LOG.isDebugEnabled()) {
              DFSClient.LOG.debug("Allocating new block");
            }
            nodes = nextBlockOutputStream(src);
            initDataStreaming();
blockの割り当てを開始します.
next BlockOutputStreamは、現在接続されているDatanodelistに戻り、initData Streamはトランスポートストリームを初期化する.
do {
        hasError = false;
        lastException = null;
        errorIndex = -1;
        success = false;

        long startTime = Time.now();
        DatanodeInfo[] excluded = excludedNodes.toArray(
            new DatanodeInfo[excludedNodes.size()]);
        block = oldBlock;
        lb = locateFollowingBlock(startTime,
            excluded.length > 0 ? excluded : null);
        block = lb.getBlock();
        block.setNumBytes(0);
        accessToken = lb.getBlockToken();
        nodes = lb.getLocations();

        //
        // Connect to first DataNode in the list.
        //
        success = createBlockOutputStream(nodes, 0L, false);

        if (!success) {
          DFSClient.LOG.info("Abandoning block " + block);
          dfsClient.namenode.abandonBlock(block, src, dfsClient.clientName);
          block = null;
          DFSClient.LOG.info("Excluding datanode " + nodes[errorIndex]);
          excludedNodes.add(nodes[errorIndex]);
        }
      } while (!success && --count >= 0);
next BlockOutputStreamはnamenodeのようにblockに書き込むために必要なdatanodelistを試みます.そして最初のdatanodeを何度も接続してみます.
具体的には:
1.locateFollowwingBlockはdfs Client.namenode.addBlock(src,dfsClient.client Name,block,excludNodes)を呼び出してnamenodeの帰るLocatedBlockオブジェクトを手に入れます.namenodeはblockid、datanodelistを返します.ちなみに、namenodeはデータをバランスよく書き込みます.安全で、データを読み込む三つの面で適切なdatanodeを選んでユーザーにあげます.
2.createBlockOutputStreamはdatanodelistリストに書き込まれた最初のdatanodeのパイプを開けてみます.datanodeのackを待っています.
続いてinitData Streamingです.
private void initDataStreaming() {
      this.setName("DataStreamer for file " + src +
          " block " + block);
      response = new ResponseProcessor(nodes);
      response.start();
      stage = BlockConstructionStage.DATA_STREAMING;
    }
datanodelistを手に入れました.私たちはReponse Processorを作成できます.機能はdatanodeの戻りのackバッグを処理します.そしてステージをDATA_にセットします.STREAMING.データ入力状態に入る.
 書き込み開始:
ClientはFSS Data Output Streamer.writeに持ってきて実際にFOutput Summerのwriteを呼び出します.
public synchronized void write(int b) throws IOException {
    sum.update(b);
    buf[count++] = (byte)b;
    if(count == buf.length) {
      flushBuffer();
    }
  }
sum.udateはCRC検査の統計を行います.データがこのクラスの内部bufをいっぱい書いたらflushBufferができます.flushBufferは最終的にDFSOutputStreamのwritechunkを呼び出します.
writeChunkに入る前に、次の3つの関連対象を説明します.chunk、packet、block.
chunk:サイズ512.CRC検査の単位.flushごとにchunkが生成されます.内部の構造コードには画像が表示されます.
buf is pointed into like follows:     *  (C is checksum data、D is payload data)     *      * [__________________________________D D D DDD___u_u u u]_]     *           ^        ^               ^               ^      *           |        checksumPos     dataStart       ダタポ     *           checksum Start
各chunkの前はチェックビットで、後は対応するデータです.
packet:サイズ64 kは、複数のchunkからなります.clientがdatanodeに書き込む最小単位です.chunk数量がpacketに含めることができる極限に達した時、client端はchunkを包装して一つのpacketを生成してdataqueueに入れて、Data Streamerに送らせます.
block:blockは私達のうれしいニュースを聞くhadoopの中で実際に保存したファイルのブロックです.サイズは64 Mです.datanodeはchunkを収集して包装して一つずつblockを生成してディスクに書きます.
はい、概念を整理してからwriteChunkの方法を見ます.
 if (currentPacket == null) {
      currentPacket = new Packet(packetSize, chunksPerPacket, 
          bytesCurBlock);
      if (DFSClient.LOG.isDebugEnabled()) {
        DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" + 
            currentPacket.seqno +
            ", src=" + src +
            ", packetSize=" + packetSize +
            ", chunksPerPacket=" + chunksPerPacket +
            ", bytesCurBlock=" + bytesCurBlock);
      }
    }

    currentPacket.writeChecksum(checksum, 0, cklen);
    currentPacket.writeData(b, offset, len);
    currentPacket.numChunks++;
    bytesCurBlock += len;

    // If packet is full, enqueue it for transmission
    //
    if (currentPacket.numChunks == currentPacket.maxChunks ||
        bytesCurBlock == blockSize) {
      if (DFSClient.LOG.isDebugEnabled()) {
        DFSClient.LOG.debug("DFSClient writeChunk packet full seqno=" +
            currentPacket.seqno +
            ", src=" + src +
            ", bytesCurBlock=" + bytesCurBlock +
            ", blockSize=" + blockSize +
            ", appendChunk=" + appendChunk);
      }
      waitAndQueueCurrentPacket();

      // If the reopened file did not end at chunk boundary and the above
      // write filled up its partial chunk. Tell the summer to generate full 
      // crc chunks from now on.
      if (appendChunk && bytesCurBlock%bytesPerChecksum == 0) {
        appendChunk = false;
        resetChecksumChunk(bytesPerChecksum);
      }

      if (!appendChunk) {
        int psize = Math.min((int)(blockSize-bytesCurBlock), dfsClient.getConf().writePacketSize);
        computePacketChunkSize(psize, bytesPerChecksum);
      }
      //
      // if encountering a block boundary, send an empty packet to 
      // indicate the end of block and reset bytesCurBlock.
      //
      if (bytesCurBlock == blockSize) {
        currentPacket = new Packet(0, 0, bytesCurBlock);
        currentPacket.lastPacketInBlock = true;
        currentPacket.syncBlock = shouldSyncBlock;
        waitAndQueueCurrentPacket();
        bytesCurBlock = 0;
        lastFlushOffset = 0;
      }
    }
1.現在のpacketがないとpacketを作成します.
2.packetに書き込む.packet数++に書き込みました.bytes CurBlock値を増加します.
3.現在のpacketがいっぱいになったら、packetをdataQueに入れます.
4.bytes CurBlockがblockSizeに等しい場合、デフォルトの64 Mは空のpacketを送り、lastPacketInBlockとsyncBlockを設定してdatanodeにblockを生成するべきだとdatanodeに教えます.
 
Datastreamerスレッドはデータをどんどんチェックします.packetが入っていることを発見するとdatanodeにデータを書き始めます.
// send the packet
          synchronized (dataQueue) {
            // move packet from dataQueue to ackQueue
            if (!one.isHeartbeatPacket()) {
              dataQueue.removeFirst();
              ackQueue.addLast(one);
              dataQueue.notifyAll();
            }
          }

          if (DFSClient.LOG.isDebugEnabled()) {
            DFSClient.LOG.debug("DataStreamer block " + block +
                " sending packet " + one);
          }

          // write out data to remote datanode
          try {
            one.writeTo(blockStream);
            blockStream.flush();   
          } catch (IOException e) {
            // HDFS-3398 treat primary DN is down since client is unable to 
            // write to primary DN 
            errorIndex = 0;
            throw e;
          }
packetはdataQueから取り出してackQueの端に置く.
 
データはすでに書き込みましたが、先にData StreamerがReponse Processserスレッド処理datanodeの戻りを開始します.今からResonseProceserのrun方法を見にきます.
while (!responderClosed && dfsClient.clientRunning && !isLastPacketInBlock) {
          // process responses from datanodes.
          try {
            // read an ack from the pipeline
            ack.readFields(blockReplyStream);
            if (DFSClient.LOG.isDebugEnabled()) {
              DFSClient.LOG.debug("DFSClient " + ack);
            }
            
            long seqno = ack.getSeqno();
            // processes response status from datanodes.
            for (int i = ack.getNumOfReplies()-1; i >=0  && dfsClient.clientRunning; i--) {
              final Status reply = ack.getReply(i);
              if (reply != SUCCESS) {
                errorIndex = i; // first bad datanode
                throw new IOException("Bad response " + reply +
                    " for block " + block +
                    " from datanode " + 
                    targets[i]);
              }
            }
            
            assert seqno != PipelineAck.UNKOWN_SEQNO : 
              "Ack for unkown seqno should be a failed ack: " + ack;
            if (seqno == Packet.HEART_BEAT_SEQNO) {  // a heartbeat ack
              continue;
            }

            // a success ack for a data packet
            Packet one = null;
            synchronized (dataQueue) {
              one = ackQueue.getFirst();
            }
            if (one.seqno != seqno) {
              throw new IOException("Responseprocessor: Expecting seqno " +
                                    " for block " + block +
                                    one.seqno + " but received " + seqno);
            }
            isLastPacketInBlock = one.lastPacketInBlock;
            // update bytesAcked
            block.setNumBytes(one.getLastByteOffsetBlock());

            synchronized (dataQueue) {
              lastAckedSeqno = seqno;
              ackQueue.removeFirst();
              dataQueue.notifyAll();
            }
1.ack.readFields(blockReplyStream)ブロック式のダタノドストリームからダタノドの戻り値を読みます.
2.レスポンスを受信した後、seqidが現在のseqかどうかを検出します.転送を表す「スライドウィンドウ」でなければエラーが発生します.
3.各datanodeの戻り値を検出します.
4.ackQueからpacketを取り出す
5.datanode.notifyAllはdataQue上に閉塞した操作を継続することができます.
 
ここではデータを書く際の流量制御と信頼性について説明します.clientとdatanodeの通信はTCPに基づいていますが、hadoop理論はどの環境にも問題がある可能性があると強調しています.だからファイルを書く時にTCPのようなスライドウィンドウモデルを使いました.各packetは送信後にackqueueに入ります.datanodeのackを受けるとseqidが正しいかどうかチェックします.packetの強い順序性を保証します.対応するackを受信した後にpacketだけが本当にメモリから削除されます.また、ackqueueを受け取ったら、Data Streamerにpacketを送ることができます.ただし、こちらはTCPのような強い要求ではなく、timeoutとnotifyAllの2つの方法で送信端の速度を制御します.送信側制御を見てください.
synchronized (dataQueue) {
            // wait for a packet to be sent.
            long now = Time.now();
            while ((!streamerClosed && !hasError && dfsClient.clientRunning 
                && dataQueue.size() == 0 && 
                (stage != BlockConstructionStage.DATA_STREAMING || 
                 stage == BlockConstructionStage.DATA_STREAMING && 
                 now - lastPacket < dfsClient.getConf().socketTimeout/2)) || doSleep ) {
              long timeout = dfsClient.getConf().socketTimeout/2 - (now-lastPacket);
              timeout = timeout <= 0 ? 1000 : timeout;
              timeout = (stage == BlockConstructionStage.DATA_STREAMING)?
                 timeout : 1000;
              try {
                dataQueue.wait(timeout);
              } catch (InterruptedException  e) {
              }
              doSleep = false;
              now = Time.now();
            }
 
最後にまとめます.
1.全体的に言えば、書き込みプロセスは書き込み速度と正確性を総合的に考慮して現在のモードを形成しています.
2.create時はnamenodeにファイル作成を要求し、datanodelistを持ってストリームを開きます.
3.書き込む時はインターフォールを書き込みます.バスシーズを待ってからflushはパッケージに入ります.
4.packetがいっぱいになったら、直接に送るのではなく、dataqueueに入れて、他のスレッドによって処理されます.
5.データの送信を開始した後、Reponse Procescesserスレッドを開いて、datanodeの戻りを非同期に処理するack.
6.定期的に包装し、単独スレッドでデータを送信する方法は、大スループットシステムの古典的なやり方であり、ackは信頼できる伝送を実現する有効な方法である.勉強してみます.