Baiduファイルシステムbfsソース分析シリーズ(二)

15279 ワード

書き込みプロセス(中)
この記事はまだ書き込みプロセスを分析していますが、多くのデータ構造の実現に関連しています.ここではまず2つのクラスBlockMappingManager/FileLockManagerを分析します.一つはblockメタデータを管理するクラスで、一つはロックの実現です.BlockMappingManagerはBlockMappingの管理クラスであり、後者は具体的なblock管理クラスであり、nameserver起動時にコマンドラインパラメータでFLAGS_blockmapping_bucket_numを指定する.
 16 class BlockMappingManager {
 17 public :
 18     BlockMappingManager(int32_t bucket_num);
 19     ~BlockMappingManager();
 46 private:
 47     int32_t blockmapping_bucket_num_;
 48     std::vector block_mapping_;
 49     ThreadPool* thread_pool_;
 50 };

 20 BlockMappingManager::BlockMappingManager(int32_t bucket_num) :
 21     blockmapping_bucket_num_(bucket_num) {
 22     thread_pool_ = new ThreadPool(FLAGS_blockmapping_working_thread_num);
 23     block_mapping_.resize(blockmapping_bucket_num_);
 24     for (size_t i = 0; i < block_mapping_.size(); i++) {
 25         block_mapping_[i] = new BlockMapping(thread_pool_);
 26     }
 27     srand(time(NULL));
 28 }

 33 int32_t BlockMappingManager::GetBucketOffset(int64_t block_id) {
 34     return block_id % blockmapping_bucket_num_;
 35 }
このクラスのすべてのインターフェースは、block_idを使用することによって、最後にGetBucketOffsetを呼び出して、どの特定のBlockMappingに位置するかを特定する.
nameserverが起動すると、構造関数NameServerImpl::NameServerImplで行われます.
  93 void NameServerImpl::CheckLeader() {
  94     if (!sync_ || sync_->IsLeader()) {
  96         NameServerLog log;
  97         std::function task =
  98             std::bind(&NameServerImpl::RebuildBlockMapCallback, this, std::placeholders::_1);
  99         namespace_->Activate(task, &log);
 100         if (!LogRemote(log, std::function())) {
 102         }
 103         recover_timeout_ = FLAGS_nameserver_start_recover_timeout;
 104         start_time_ = common::timer::get_micros();
 105         work_thread_pool_->DelayTask(1000, std::bind(&NameServerImpl::CheckRecoverMode, this));
 106         is_leader_ = true;
 107     } else {
 108         is_leader_ = false;
 109         work_thread_pool_->DelayTask(100, std::bind(&NameServerImpl::CheckLeader, this));
 111     }
 112 }
namespace_->Activateにおいて、
 52 void NameSpace::Activate(std::function callback, NameServerLog* log) {
 53     //more code...
 75     SetupRoot();
 76     RebuildBlockMap(callback);
 77     InitBlockIdUpbound(log);
 78 }

675 bool NameSpace::RebuildBlockMap(std::function callback) {
676     //more code...
681     leveldb::Iterator* it = db_->NewIterator(leveldb::ReadOptions());
682     for (it->Seek(std::string(7, '\0') + '\1'); it->Valid(); it->Next()) {
683         FileInfo file_info;
684         bool ret = file_info.ParseFromArray(it->value().data(), it->value().size());
689         FileType file_type = GetFileType(file_info.type());
690         if (file_type == kDefault) {
691             //a file
692             for (int i = 0; i < file_info.blocks_size(); i++) {
693                 if (file_info.blocks(i) >= next_block_id_) {
694                     next_block_id_ = file_info.blocks(i) + 1;
695                     block_id_upbound_ = next_block_id_;
696                 }
697                 ++block_num;
698             }
699             ++file_num;
700             if (callback) {
701                 callback(file_info);
702             }
707         }
708     }
709 //more code...
RebuildBlockMapCallbackを行います.
1065 void NameServerImpl::RebuildBlockMapCallback(const FileInfo& file_info) {
1066     for (int i = 0; i < file_info.blocks_size(); i++) {
1067         int64_t block_id = file_info.blocks(i);
1068         int64_t version = file_info.version();
1069         block_mapping_manager_->RebuildBlock(block_id, file_info.replicas(), version, file_info.size());
1071     }   
1072 } 
上記のプロセスは、起動時にleveldb中のメタデータを巡回し、マッピング関係を確立し、last_entry_id_ /next_block_id_などの情報を初期化して、後続の本当にファイルを書く時に新しいblockインデックスを割り当てるためにも使用され、その具体的な役割を紹介します.
以下は具体的にblockの構造です.
 23 struct NSBlock {
 24     int64_t id;
 25     int64_t version; 
 26     std::set replica;
 27     int64_t block_size;
 28     uint32_t expect_replica_num;
 29     RecoverStat recover_stat;                 
 30     std::set incomplete_replica;
 31     NSBlock();
 32     NSBlock(int64_t block_id, int32_t replica, int64_t version, int64_t size);
 33     bool operatorreplica.size() >= b.replica.size());
 35     }   
 36 };

 48 enum RecoverStat { // block   
 49     kNotInRecover = 0;
 50     kLoRecover = 1;
 51     kHiRecover = 2;
 52     kCheck = 3;
 53     kIncomplete = 4;
 54     kLost = 5;
 55     kBlockWriting = 6;
 56     kAny = 20;
 57 }

105 void BlockMapping::RebuildBlock(int64_t block_id, int32_t replica,
106                                 int64_t version, int64_t size) {
107     NSBlock* nsblock = NULL;
108     nsblock = new NSBlock(block_id, replica, version, size);
109     if (size) {//           size    
110         nsblock->recover_stat = kLost;
111         lost_blocks_.insert(block_id);
112     } else {                   
113         nsblock->recover_stat = kBlockWriting;
114     }
121     
123     MutexLock lock(&mu_);
125     std::pair<:iterator bool=""> ret =
126         block_map_.insert(std::make_pair(block_id, nsblock));
129 }
ここで、nsblockは初期化プロセスにおいてrecover_stat(version < 0 ? kBlockWriting : kNotInRecover)、バージョン番号でブロックがどのような状態にあるかを決定し、後にsizeに従ってこの状態をリセットするので、ちょっと分かりません.
一つのblockの状態はいくつかありますが、それぞれどのような状況で対応の状態を設定しますか?例えばバージョン番号が違っています.サイズが違っています.同期されていません.後は具体的に書類を書く時に分析します.BlockMappingに残された変数の役割は、後に分析され、後に分析されたブロックの更新/修復プロセスである.
 58 class BlockMapping {
 59 public:
 68     bool UpdateBlockInfo(int64_t block_id, int32_t server_id, int64_t block_size, int64_t block_version);
 84 private:
 85     void DealWithDeadBlockInternal(int32_t cs_id, int64_t block_id);
 86     typedef std::map > CheckList;
 89     void TryRecover(NSBlock* block);
104 private:
105     Mutex mu_;
106     ThreadPool* thread_pool_;
107     typedef std::map NSBlockMap;
108     NSBlockMap block_map_;
109 
110     CheckList hi_recover_check_;
111     CheckList lo_recover_check_;
112     CheckList incomplete_;
113     std::set lo_pri_recover_;
114     std::set hi_pri_recover_;
115     std::set lost_blocks_;
116 };
FileLockManagerを使用して実装されている分析は、その場所から始めたほうがいいと思います.NameServerImpl::CreateFileにおいて、マルチスレッドのプロセスであり、複数スレッドのディレクトリが同じ時刻に操作されている可能性があります.ファイルの作成やディレクトリの削除など、異なるスレッドに配信されるとデータが一致しなくなります.なぜこのようにしますか?およびロックの粒度:
 369 void NameServerImpl::CreateFile(::google::protobuf::RpcController* controller,
 370                                 const CreateFileRequest* request,
 371                                 CreateFileResponse* response,
 372                                 ::google::protobuf::Closure* done) {
 373     //more code...
 389     FileLockGuard file_lock(new WriteLock(path));
 390     StatusCode status = namespace_->CreateFile(path, flags, mode, replica_num, &blocks_to_remov     e, &log);
 391     //more code...
以下は、読み書きロックの種類の声明であり、経路データのみが含まれています.本当のロックではなく、最近はfile_lock_manager_に関連するメンバー関数を呼び出します.
 19 class Lock {
 20 public:
 21     virtual ~Lock() {}
 22 };  
 23 
 24 class WriteLock : public Lock {
 25 public:
 26     WriteLock(const std::string& file_path);
 27     WriteLock(const std::string& file_path_a,
 28               const std::string& file_path_b);//       ,   ,         ,     ;
 29     ~WriteLock();
 30     static void SetFileLockManager(FileLockManager* file_lock_manager);
 31 private:
 32     // will be initialized in NameServerImpl's constructor
 33     static FileLockManager* file_lock_manager_;
 34     std::vector<:string> file_path_;
 35 };  
 36 
 37 class ReadLock : public Lock {
 38 public:
 39     ReadLock(const std::string& file_path);
 40     ~ReadLock();
 41     static void SetFileLockManager(FileLockManager* file_lock_manager);
 42 private:
 43     // will be initialized in NameServerImpl's constructor
 44     static FileLockManager* file_lock_manager_;
 45     std::string file_path_; 
 46 };

 16 WriteLock::WriteLock(const std::string& file_path) {
 17     file_path_.push_back(file_path);
 18     file_lock_manager_->WriteLock(file_path);
 19 }

 40 WriteLock::~WriteLock() {
 41     if (file_path_.size() == 1) {
 42         file_lock_manager_->Unlock(file_path_[0]);
 43     } else {//     
 44         file_lock_manager_->Unlock(file_path_[1]);
 45         file_lock_manager_->Unlock(file_path_[0]);
 46     }
 47 }
 52 
 53 ReadLock::ReadLock(const std::string& file_path) {
 54     file_path_ = file_path;
 55     file_lock_manager_->ReadLock(file_path);
 56 }
 57 
 58 ReadLock::~ReadLock() {
 59     file_lock_manager_->Unlock(file_path_);
 60 }
 21 class FileLockManager {
 22 public:
 23     FileLockManager(int bucket_num = 19);
 24     ~FileLockManager();
 25     void ReadLock(const std::string& file_path);
 26     void WriteLock(const std::string& file_path);
 27     void Unlock(const std::string& file_path);
 28 private:
 29     enum LockType {
 30         kRead,
 31         kWrite
 32     };
 33     struct LockEntry {
 34         common::Counter ref_;
 35         common::RWLock rw_lock_;
 36     };
 37     struct LockBucket {
 38         Mutex mu;
 39         std::unordered_map<:string lockentry=""> lock_map;
 40     };
 41     void LockInternal(const std::string& path, LockType lock_type);
 42     void UnlockInternal(const std::string& path);
 43     int GetBucketOffset(const std::string& path);
 44 private:
 45     std::vector locks_;
 46 };
もし実現中なら、パスにロックをかけます.実はここでは一級ごとにロックをかけます.「最後のレベルのファイルに書き込みロックをかけます.前のすべてのディレクトリに読み取りロックをかけます.ロックをかける時は、ルートディレクトリから下級のディレクトリにロックをかけてみます.また、読み書きロックをする時は、書き込みを優先して、ディレクトリ操作の飢餓を防ぐことができます.」レンameにおいては二つの異なる経路が関与しているため、デッドロックを防止するためにレンameが動作する場合は、経路の辞書順に従ってロックをかけます.
上記のCreateFileのように、ファイルまたはディレクトリを変更するメタデータについては、WriteLockを追加し、最終的に:
 46 void FileLockManager::WriteLock(const std::string& file_path) {
 48     std::vector<:string> paths;
 49     common::SplitString(file_path, "/", &paths);//     
 50     // first lock "/"
 51     if (paths.size() == 0) { //       
 52         LockInternal("/", kWrite);
 53         return;
 54     }
 55     LockInternal("/", kRead);//         
 56     std::string cur_path;
 57     for (size_t i = 0; i < paths.size() - 1; i++) {
 58         cur_path += ("/" + paths[i]);
 59         LockInternal(cur_path, kRead);//     
 60     }
 61     cur_path += ("/" + paths.back()); //   
 62     LockInternal(cur_path, kWrite);
 63 }
以上の実装は、ディレクトリがあるかどうかを判断し、SplitStringによって分割され、ない場合はルートディレクトリに書き込みロックをかけて戻ります.そうでない場合は、ルートディレクトリに読み取りロックをかけてから、各階層のディレクトリの所在経路に読み取りロックをかけて、最後に全体のパス、すなわちファイル名の絶対パスに書き込みロックをかけます.
 83 void FileLockManager::LockInternal(const std::string& path,
 84                                      LockType lock_type) {
 85     LockEntry* entry = NULL;
 86 
 87     int bucket_offset = GetBucketOffset(path);//hash path
 88     LockBucket* lock_bucket = locks_[bucket_offset];
 89 
 90     {
 91         MutexLock lock(&(lock_bucket->mu));
 92         auto it = lock_bucket->lock_map.find(path);
 93         if (it == lock_bucket->lock_map.end()) {
 94             entry = new LockEntry();
 95             // hold a ref for lock_map_
 96             entry->ref_.Inc();
 97             lock_bucket->lock_map.insert(std::make_pair(path, entry));
 98         } else {
 99             entry = it->second;
100         }
101         // inc ref_ first to prevent deconstruct
102         entry->ref_.Inc();
103     }
104 
105     if (lock_type == kRead) {
106         // get read lock
107         entry->rw_lock_.ReadLock();
108     } else {
109         // get write lock
110         entry->rw_lock_.WriteLock();
111     }
112 }

 33 void FileLockManager::ReadLock(const std::string& file_path) {
 35     std::vector<:string> paths;
 36     common::SplitString(file_path, "/", &paths);
 37     // first lock "/"
 38     LockInternal("/", kRead);
 39     std::string cur_path;
 40     for (size_t i = 0; i < paths.size(); i++) {
 41         cur_path += ("/" + paths[i]);
 42         LockInternal(cur_path, kRead);
 43     }
 44 }
以上は具体的なアンロック動作プロセスであり、まずpathハッシュからvectorのどれに/を入力してから、あるLockBucketに具体的に言及して、ない場合は参照カウント1を構築して設定し、その後カウントに1を追加します.続いて読みますか?それとも書き込みロックがあるかを判断します.
チャックをかけると書き込みロックをする部分がほぼ同じになりますので、分析しません.
以下にロック解除時のプロセスを割り当てます.
 65 void FileLockManager::Unlock(const std::string& file_path) {
 66     /// TODO maybe use NormalizePath is better
 67     std::vector<:string> paths;
 68     common::SplitString(file_path, "/", &paths);
 69     std::string path;
 70     for (size_t i = 0; i < paths.size(); i++) {
 71         path += ("/" + paths[i]);
 72     }
 74     std::string cur_path = path;
 75     for (size_t i = 0; i < paths.size() ; i++) {
 76         UnlockInternal(cur_path);
 77         cur_path.resize(cur_path.find_last_of('/'));
 78     }
 79     // last unlock "/"
 80     UnlockInternal("/");
 81 }

114 void FileLockManager::UnlockInternal(const std::string& path) {
115     int bucket_offset = GetBucketOffset(path);
116     LockBucket* lock_bucket = locks_[bucket_offset];
117 
118     MutexLock lock(&(lock_bucket->mu));
119     auto it = lock_bucket->lock_map.find(path);
120     assert(it != lock_bucket->lock_map.end());
121     LockEntry* entry = it->second;
122     // release lock
123     entry->rw_lock_.Unlock();
124     if (entry->ref_.Dec() == 1) {
125         // we are the last holder
126         /// TODO maybe don't need to deconstruct immediately
127         delete entry;
128         lock_bucket->lock_map.erase(it);
129     }
130 }
引用「あるレベルのディレクトリまたはファイルの操作が完了したらロックを解除する必要があります.リリース時には、まずリード・書き込み・ロックを解除し、LockEntryの参照カウントを1減らします.参照カウントを0に減らすと、LockEntryの解析をトリガします.」
以上の実装は、接続中に説明されたように、性能に問題がある可能性があり、参照:「各ファイルのロックが相対的に独立していることを考慮して、LockEntryにおけるマッピング情報をドラム缶化することができる.
ほとんどの場合、同じファイルのメタデータが同時に操作されることはないので、ロックをかけたり、ロックを解除したりすると、FileLockManager構造の構造や解析を引き起こしたり、cacheに補佐したりして、リリースが必要なLockEntryを先にキャッシュして、最終的なリリースに責任を負わせたりします.新規申請LockEntryが必要な場合、直接にキャッシュから一つのリコールを行います.カーネル中のslabのようなやり方ができます.」
TODO分析ブロックの修復、更新プロセス、同期、次の分析.
bfs fileku