Baiduファイルシステムbfsソース分析シリーズ(二)
15279 ワード
書き込みプロセス(中)
この記事はまだ書き込みプロセスを分析していますが、多くのデータ構造の実現に関連しています.ここではまず2つのクラス
nameserverが起動すると、構造関数
以下は具体的にblockの構造です.
一つのblockの状態はいくつかありますが、それぞれどのような状況で対応の状態を設定しますか?例えばバージョン番号が違っています.サイズが違っています.同期されていません.後は具体的に書類を書く時に分析します.
上記の
チャックをかけると書き込みロックをする部分がほぼ同じになりますので、分析しません.
以下にロック解除時のプロセスを割り当てます.
以上の実装は、接続中に説明されたように、性能に問題がある可能性があり、参照:「各ファイルのロックが相対的に独立していることを考慮して、
ほとんどの場合、同じファイルのメタデータが同時に操作されることはないので、ロックをかけたり、ロックを解除したりすると、
TODO分析ブロックの修復、更新プロセス、同期、次の分析.
bfs fileku
この記事はまだ書き込みプロセスを分析していますが、多くのデータ構造の実現に関連しています.ここではまず2つのクラス
BlockMappingManager/FileLockManager
を分析します.一つはblockメタデータを管理するクラスで、一つはロックの実現です.BlockMappingManager
はBlockMappingの管理クラスであり、後者は具体的なblock管理クラスであり、nameserver起動時にコマンドラインパラメータでFLAGS_blockmapping_bucket_num
を指定する. 16 class BlockMappingManager {
17 public :
18 BlockMappingManager(int32_t bucket_num);
19 ~BlockMappingManager();
46 private:
47 int32_t blockmapping_bucket_num_;
48 std::vector block_mapping_;
49 ThreadPool* thread_pool_;
50 };
20 BlockMappingManager::BlockMappingManager(int32_t bucket_num) :
21 blockmapping_bucket_num_(bucket_num) {
22 thread_pool_ = new ThreadPool(FLAGS_blockmapping_working_thread_num);
23 block_mapping_.resize(blockmapping_bucket_num_);
24 for (size_t i = 0; i < block_mapping_.size(); i++) {
25 block_mapping_[i] = new BlockMapping(thread_pool_);
26 }
27 srand(time(NULL));
28 }
33 int32_t BlockMappingManager::GetBucketOffset(int64_t block_id) {
34 return block_id % blockmapping_bucket_num_;
35 }
このクラスのすべてのインターフェースは、block_id
を使用することによって、最後にGetBucketOffset
を呼び出して、どの特定のBlockMapping
に位置するかを特定する.nameserverが起動すると、構造関数
NameServerImpl::NameServerImpl
で行われます. 93 void NameServerImpl::CheckLeader() {
94 if (!sync_ || sync_->IsLeader()) {
96 NameServerLog log;
97 std::function task =
98 std::bind(&NameServerImpl::RebuildBlockMapCallback, this, std::placeholders::_1);
99 namespace_->Activate(task, &log);
100 if (!LogRemote(log, std::function())) {
102 }
103 recover_timeout_ = FLAGS_nameserver_start_recover_timeout;
104 start_time_ = common::timer::get_micros();
105 work_thread_pool_->DelayTask(1000, std::bind(&NameServerImpl::CheckRecoverMode, this));
106 is_leader_ = true;
107 } else {
108 is_leader_ = false;
109 work_thread_pool_->DelayTask(100, std::bind(&NameServerImpl::CheckLeader, this));
111 }
112 }
namespace_->Activate
において、 52 void NameSpace::Activate(std::function callback, NameServerLog* log) {
53 //more code...
75 SetupRoot();
76 RebuildBlockMap(callback);
77 InitBlockIdUpbound(log);
78 }
675 bool NameSpace::RebuildBlockMap(std::function callback) {
676 //more code...
681 leveldb::Iterator* it = db_->NewIterator(leveldb::ReadOptions());
682 for (it->Seek(std::string(7, '\0') + '\1'); it->Valid(); it->Next()) {
683 FileInfo file_info;
684 bool ret = file_info.ParseFromArray(it->value().data(), it->value().size());
689 FileType file_type = GetFileType(file_info.type());
690 if (file_type == kDefault) {
691 //a file
692 for (int i = 0; i < file_info.blocks_size(); i++) {
693 if (file_info.blocks(i) >= next_block_id_) {
694 next_block_id_ = file_info.blocks(i) + 1;
695 block_id_upbound_ = next_block_id_;
696 }
697 ++block_num;
698 }
699 ++file_num;
700 if (callback) {
701 callback(file_info);
702 }
707 }
708 }
709 //more code...
RebuildBlockMapCallback
を行います.1065 void NameServerImpl::RebuildBlockMapCallback(const FileInfo& file_info) {
1066 for (int i = 0; i < file_info.blocks_size(); i++) {
1067 int64_t block_id = file_info.blocks(i);
1068 int64_t version = file_info.version();
1069 block_mapping_manager_->RebuildBlock(block_id, file_info.replicas(), version, file_info.size());
1071 }
1072 }
上記のプロセスは、起動時にleveldb中のメタデータを巡回し、マッピング関係を確立し、last_entry_id_ /next_block_id_
などの情報を初期化して、後続の本当にファイルを書く時に新しいblockインデックスを割り当てるためにも使用され、その具体的な役割を紹介します.以下は具体的にblockの構造です.
23 struct NSBlock {
24 int64_t id;
25 int64_t version;
26 std::set replica;
27 int64_t block_size;
28 uint32_t expect_replica_num;
29 RecoverStat recover_stat;
30 std::set incomplete_replica;
31 NSBlock();
32 NSBlock(int64_t block_id, int32_t replica, int64_t version, int64_t size);
33 bool operatorreplica.size() >= b.replica.size());
35 }
36 };
48 enum RecoverStat { // block
49 kNotInRecover = 0;
50 kLoRecover = 1;
51 kHiRecover = 2;
52 kCheck = 3;
53 kIncomplete = 4;
54 kLost = 5;
55 kBlockWriting = 6;
56 kAny = 20;
57 }
105 void BlockMapping::RebuildBlock(int64_t block_id, int32_t replica,
106 int64_t version, int64_t size) {
107 NSBlock* nsblock = NULL;
108 nsblock = new NSBlock(block_id, replica, version, size);
109 if (size) {// size
110 nsblock->recover_stat = kLost;
111 lost_blocks_.insert(block_id);
112 } else {
113 nsblock->recover_stat = kBlockWriting;
114 }
121
123 MutexLock lock(&mu_);
125 std::pair<:iterator bool=""> ret =
126 block_map_.insert(std::make_pair(block_id, nsblock));
129 }
ここで、nsblockは初期化プロセスにおいてrecover_stat(version < 0 ? kBlockWriting : kNotInRecover)
、バージョン番号でブロックがどのような状態にあるかを決定し、後にsizeに従ってこの状態をリセットするので、ちょっと分かりません.一つのblockの状態はいくつかありますが、それぞれどのような状況で対応の状態を設定しますか?例えばバージョン番号が違っています.サイズが違っています.同期されていません.後は具体的に書類を書く時に分析します.
BlockMapping
に残された変数の役割は、後に分析され、後に分析されたブロックの更新/修復プロセスである. 58 class BlockMapping {
59 public:
68 bool UpdateBlockInfo(int64_t block_id, int32_t server_id, int64_t block_size, int64_t block_version);
84 private:
85 void DealWithDeadBlockInternal(int32_t cs_id, int64_t block_id);
86 typedef std::map > CheckList;
89 void TryRecover(NSBlock* block);
104 private:
105 Mutex mu_;
106 ThreadPool* thread_pool_;
107 typedef std::map NSBlockMap;
108 NSBlockMap block_map_;
109
110 CheckList hi_recover_check_;
111 CheckList lo_recover_check_;
112 CheckList incomplete_;
113 std::set lo_pri_recover_;
114 std::set hi_pri_recover_;
115 std::set lost_blocks_;
116 };
FileLockManager
を使用して実装されている分析は、その場所から始めたほうがいいと思います.NameServerImpl::CreateFile
において、マルチスレッドのプロセスであり、複数スレッドのディレクトリが同じ時刻に操作されている可能性があります.ファイルの作成やディレクトリの削除など、異なるスレッドに配信されるとデータが一致しなくなります.なぜこのようにしますか?およびロックの粒度: 369 void NameServerImpl::CreateFile(::google::protobuf::RpcController* controller,
370 const CreateFileRequest* request,
371 CreateFileResponse* response,
372 ::google::protobuf::Closure* done) {
373 //more code...
389 FileLockGuard file_lock(new WriteLock(path));
390 StatusCode status = namespace_->CreateFile(path, flags, mode, replica_num, &blocks_to_remov e, &log);
391 //more code...
以下は、読み書きロックの種類の声明であり、経路データのみが含まれています.本当のロックではなく、最近はfile_lock_manager_
に関連するメンバー関数を呼び出します. 19 class Lock {
20 public:
21 virtual ~Lock() {}
22 };
23
24 class WriteLock : public Lock {
25 public:
26 WriteLock(const std::string& file_path);
27 WriteLock(const std::string& file_path_a,
28 const std::string& file_path_b);// , , , ;
29 ~WriteLock();
30 static void SetFileLockManager(FileLockManager* file_lock_manager);
31 private:
32 // will be initialized in NameServerImpl's constructor
33 static FileLockManager* file_lock_manager_;
34 std::vector<:string> file_path_;
35 };
36
37 class ReadLock : public Lock {
38 public:
39 ReadLock(const std::string& file_path);
40 ~ReadLock();
41 static void SetFileLockManager(FileLockManager* file_lock_manager);
42 private:
43 // will be initialized in NameServerImpl's constructor
44 static FileLockManager* file_lock_manager_;
45 std::string file_path_;
46 };
16 WriteLock::WriteLock(const std::string& file_path) {
17 file_path_.push_back(file_path);
18 file_lock_manager_->WriteLock(file_path);
19 }
40 WriteLock::~WriteLock() {
41 if (file_path_.size() == 1) {
42 file_lock_manager_->Unlock(file_path_[0]);
43 } else {//
44 file_lock_manager_->Unlock(file_path_[1]);
45 file_lock_manager_->Unlock(file_path_[0]);
46 }
47 }
52
53 ReadLock::ReadLock(const std::string& file_path) {
54 file_path_ = file_path;
55 file_lock_manager_->ReadLock(file_path);
56 }
57
58 ReadLock::~ReadLock() {
59 file_lock_manager_->Unlock(file_path_);
60 }
21 class FileLockManager {
22 public:
23 FileLockManager(int bucket_num = 19);
24 ~FileLockManager();
25 void ReadLock(const std::string& file_path);
26 void WriteLock(const std::string& file_path);
27 void Unlock(const std::string& file_path);
28 private:
29 enum LockType {
30 kRead,
31 kWrite
32 };
33 struct LockEntry {
34 common::Counter ref_;
35 common::RWLock rw_lock_;
36 };
37 struct LockBucket {
38 Mutex mu;
39 std::unordered_map<:string lockentry=""> lock_map;
40 };
41 void LockInternal(const std::string& path, LockType lock_type);
42 void UnlockInternal(const std::string& path);
43 int GetBucketOffset(const std::string& path);
44 private:
45 std::vector locks_;
46 };
もし実現中なら、パスにロックをかけます.実はここでは一級ごとにロックをかけます.「最後のレベルのファイルに書き込みロックをかけます.前のすべてのディレクトリに読み取りロックをかけます.ロックをかける時は、ルートディレクトリから下級のディレクトリにロックをかけてみます.また、読み書きロックをする時は、書き込みを優先して、ディレクトリ操作の飢餓を防ぐことができます.」レンameにおいては二つの異なる経路が関与しているため、デッドロックを防止するためにレンameが動作する場合は、経路の辞書順に従ってロックをかけます.上記の
CreateFile
のように、ファイルまたはディレクトリを変更するメタデータについては、WriteLock
を追加し、最終的に: 46 void FileLockManager::WriteLock(const std::string& file_path) {
48 std::vector<:string> paths;
49 common::SplitString(file_path, "/", &paths);//
50 // first lock "/"
51 if (paths.size() == 0) { //
52 LockInternal("/", kWrite);
53 return;
54 }
55 LockInternal("/", kRead);//
56 std::string cur_path;
57 for (size_t i = 0; i < paths.size() - 1; i++) {
58 cur_path += ("/" + paths[i]);
59 LockInternal(cur_path, kRead);//
60 }
61 cur_path += ("/" + paths.back()); //
62 LockInternal(cur_path, kWrite);
63 }
以上の実装は、ディレクトリがあるかどうかを判断し、SplitString
によって分割され、ない場合はルートディレクトリに書き込みロックをかけて戻ります.そうでない場合は、ルートディレクトリに読み取りロックをかけてから、各階層のディレクトリの所在経路に読み取りロックをかけて、最後に全体のパス、すなわちファイル名の絶対パスに書き込みロックをかけます. 83 void FileLockManager::LockInternal(const std::string& path,
84 LockType lock_type) {
85 LockEntry* entry = NULL;
86
87 int bucket_offset = GetBucketOffset(path);//hash path
88 LockBucket* lock_bucket = locks_[bucket_offset];
89
90 {
91 MutexLock lock(&(lock_bucket->mu));
92 auto it = lock_bucket->lock_map.find(path);
93 if (it == lock_bucket->lock_map.end()) {
94 entry = new LockEntry();
95 // hold a ref for lock_map_
96 entry->ref_.Inc();
97 lock_bucket->lock_map.insert(std::make_pair(path, entry));
98 } else {
99 entry = it->second;
100 }
101 // inc ref_ first to prevent deconstruct
102 entry->ref_.Inc();
103 }
104
105 if (lock_type == kRead) {
106 // get read lock
107 entry->rw_lock_.ReadLock();
108 } else {
109 // get write lock
110 entry->rw_lock_.WriteLock();
111 }
112 }
33 void FileLockManager::ReadLock(const std::string& file_path) {
35 std::vector<:string> paths;
36 common::SplitString(file_path, "/", &paths);
37 // first lock "/"
38 LockInternal("/", kRead);
39 std::string cur_path;
40 for (size_t i = 0; i < paths.size(); i++) {
41 cur_path += ("/" + paths[i]);
42 LockInternal(cur_path, kRead);
43 }
44 }
以上は具体的なアンロック動作プロセスであり、まずpathハッシュからvectorのどれに/
を入力してから、あるLockBucket
に具体的に言及して、ない場合は参照カウント1を構築して設定し、その後カウントに1を追加します.続いて読みますか?それとも書き込みロックがあるかを判断します.チャックをかけると書き込みロックをする部分がほぼ同じになりますので、分析しません.
以下にロック解除時のプロセスを割り当てます.
65 void FileLockManager::Unlock(const std::string& file_path) {
66 /// TODO maybe use NormalizePath is better
67 std::vector<:string> paths;
68 common::SplitString(file_path, "/", &paths);
69 std::string path;
70 for (size_t i = 0; i < paths.size(); i++) {
71 path += ("/" + paths[i]);
72 }
74 std::string cur_path = path;
75 for (size_t i = 0; i < paths.size() ; i++) {
76 UnlockInternal(cur_path);
77 cur_path.resize(cur_path.find_last_of('/'));
78 }
79 // last unlock "/"
80 UnlockInternal("/");
81 }
114 void FileLockManager::UnlockInternal(const std::string& path) {
115 int bucket_offset = GetBucketOffset(path);
116 LockBucket* lock_bucket = locks_[bucket_offset];
117
118 MutexLock lock(&(lock_bucket->mu));
119 auto it = lock_bucket->lock_map.find(path);
120 assert(it != lock_bucket->lock_map.end());
121 LockEntry* entry = it->second;
122 // release lock
123 entry->rw_lock_.Unlock();
124 if (entry->ref_.Dec() == 1) {
125 // we are the last holder
126 /// TODO maybe don't need to deconstruct immediately
127 delete entry;
128 lock_bucket->lock_map.erase(it);
129 }
130 }
引用「あるレベルのディレクトリまたはファイルの操作が完了したらロックを解除する必要があります.リリース時には、まずリード・書き込み・ロックを解除し、LockEntry
の参照カウントを1減らします.参照カウントを0に減らすと、LockEntry
の解析をトリガします.」以上の実装は、接続中に説明されたように、性能に問題がある可能性があり、参照:「各ファイルのロックが相対的に独立していることを考慮して、
LockEntry
におけるマッピング情報をドラム缶化することができる.ほとんどの場合、同じファイルのメタデータが同時に操作されることはないので、ロックをかけたり、ロックを解除したりすると、
FileLockManager
構造の構造や解析を引き起こしたり、cacheに補佐したりして、リリースが必要なLockEntry
を先にキャッシュして、最終的なリリースに責任を負わせたりします.新規申請LockEntry
が必要な場合、直接にキャッシュから一つのリコールを行います.カーネル中のslabのようなやり方ができます.」TODO分析ブロックの修復、更新プロセス、同期、次の分析.
bfs fileku