高可用性、高性能、スレッドセキュリティ、自動データリカバリredo logクラス


金融、電子商取引、などの高可用性環境の下で、データの損失が許されない場合、redo logは良い低コストで高可用性方案であり、その役割はmysql binlogに似ている.以下、生産環境の中でイベントと最適化のマルチスレッド安全、自分で回復したc++redo log類は、実際の生産過程で相応の問題に直面し、多くの場所で最適化設計を行う.高性能と信頼性を最大限に保つ.
/******************************************************
function: redo log is use for service restart, sys crash
		  and Exception to recover the data.
author: liuyi
date:2016.08.26
version:1.0
******************************************************/

#ifndef REDO_LOG_H
#define REDO_LOG_H

#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 

using namespace std;

struct redo_file_pair_info
{
	int file_max_lines;
	int file_input_lines;
	int file_output_lines;
	FILE *output_log_ptr;
};

struct reload_data
{
	string file_prefix;
	string uid;
	string log_str;
};

class redo_log
{
	public:
		redo_log()
		{
			m_log_version = 0;
			m_uuid = 0;
			m_input_log_lines = 0;
			m_max_log_lines = 100000;
		 	m_rm_finish_log_time = 5;
			m_mutex = new pthread_mutex_t;
			pthread_mutex_init(m_mutex, NULL);

			sleep(1);//insure every start the log file name is not same 
			m_start_time = time(NULL);
			char tmp[64] = {0};
			sprintf(tmp, "%lu_%lld", m_start_time, m_log_version);
			m_current_input_prefix = string(tmp);
		}

		~redo_log()
		{
			if(m_mutex != NULL)
			{
				pthread_mutex_destroy(m_mutex);
				delete m_mutex;
				m_mutex = NULL;
			}
		}

		bool init(const string& redo_log_path, int max_log_lines, int rm_finish_log_time = 5)
		{
			m_path = redo_log_path;
			m_max_log_lines = max_log_lines;
		 	m_rm_finish_log_time = rm_finish_log_time;
			char file_name[1024] = {0};
			snprintf(file_name, 1023, "%s/%s.input", m_path.c_str(), m_current_input_prefix.c_str());
			m_input_log = fopen(file_name, "w");
			memset(file_name, '\0', 1024);
			snprintf(file_name, 1023, "%s/%s.output", m_path.c_str(), m_current_input_prefix.c_str());
			m_output_log = fopen(file_name, "w");

			redo_file_pair_info new_file;
			new_file.file_max_lines = m_max_log_lines;
			new_file.file_input_lines = 0;
			new_file.file_output_lines = 0;
			new_file.output_log_ptr = m_output_log;
			m_file_pair_info_map.insert(pair(m_current_input_prefix, new_file));
			++m_log_version;

			pthread_t p_id;
			pthread_create(&p_id, NULL, delete_finish_file_thread, this);
			return true;
		}
		
		string get_uuid()
		{				
			char uid[128] = {0};
			pthread_mutex_lock(m_mutex);
			snprintf(uid, 127, "%lu_%ld", m_start_time, m_uuid);
			++m_uuid;
			pthread_mutex_unlock(m_mutex);
			return string(uid);
		}

		string get_current_log_prefix()const
		{
			pthread_mutex_lock(m_mutex);
			string tmp = m_current_input_prefix;
			pthread_mutex_unlock(m_mutex);
			return tmp;
		}

		bool get_all_files(const string& dir, vector& all_file_vect)
		{
			DIR *dp = opendir(dir.c_str());
			if(NULL == dp)
			{
				return false;
			}
			chdir(dir.c_str());

			struct dirent *entry;
			while((entry = readdir(dp)) != NULL)
			{
				struct stat statbuf;
				lstat(entry->d_name,&statbuf);
				all_file_vect.push_back(entry->d_name);
			}
			chdir("../");
			closedir(dp);
			return true;
		}

		int load_file(const string& file_name, set& lines)
		{
			ifstream infile;
			infile.open(file_name.c_str());
			string line;
			while(getline(infile, line))
			{
				lines.insert(line);
			}
			infile.close();
			return lines.size();
		}

		bool reload_unfinish_records(vector& unfinish_records)
		{
			vector all_file_names;
			if(!get_all_files(m_path, all_file_names))
				return false;

			set input_file_set;
			set output_file_set;
			for(size_t i = 0; i < all_file_names.size(); i++)
			{
				if(all_file_names[i].find(".input") != string::npos)
					input_file_set.insert(all_file_names[i].substr(0, all_file_names[i].find(".input")));
				else if(all_file_names[i].find(".output") != string::npos)
					output_file_set.insert(all_file_names[i].substr(0, all_file_names[i].find(".output")));
			}
			
			for(set::iterator it = input_file_set.begin(); it != input_file_set.end(); ++it)
			{
				set input_log_lines;
				load_file(m_path + "/" + *it + ".input", input_log_lines);
				if(output_file_set.find(*it) != output_file_set.end())
				{
					set output_log_lines;
					load_file(m_path + "/" + *it + ".output", output_log_lines);

					if(input_log_lines.size() == output_log_lines.size())
					{
						string rm_command = "rm -rf " + m_path + "/" + *it + ".*";
						if(0 != system(rm_command.c_str()))
							return false;
						continue;
					}

					for(set::iterator iter = input_log_lines.begin(); 
						iter != input_log_lines.end(); ++iter)
					{
						if(output_log_lines.find(*iter) == output_log_lines.end())
						{
							string::size_type index = (*iter).find("\x01\x02\x03");
							if(index != string::npos)
							{
								string uid = (*iter).substr(0, index);
								string log_str = (*iter).substr(index + 3);
								reload_data tmp;
								tmp.file_prefix = *it;
								tmp.uid = uid;
								tmp.log_str = log_str;
								unfinish_records.push_back(tmp);
							}
						}
					}
					
					redo_file_pair_info new_file;
					new_file.file_max_lines = input_log_lines.size();
					new_file.file_input_lines = input_log_lines.size();
					new_file.file_output_lines = output_log_lines.size();
					FILE *fp = fopen((m_path + "/" + *it + ".output").c_str(), "a");
					new_file.output_log_ptr = fp;
					m_file_pair_info_map.insert(pair(*it, new_file));
				}
				else
				{
					for(set::iterator iter = input_log_lines.begin(); 
						iter != input_log_lines.end(); ++iter)
					{
						string::size_type index = (*iter).find("\x01\x02\x03");
						if(index != string::npos)
						{
							string uid = (*iter).substr(0, index);
							string log_str = (*iter).substr(index + 3);
							reload_data tmp;
							tmp.file_prefix = *it;
							tmp.uid = uid;
							tmp.log_str = log_str;
							unfinish_records.push_back(tmp);
						}
					}

					redo_file_pair_info new_file;
					new_file.file_max_lines = input_log_lines.size();
					new_file.file_input_lines = input_log_lines.size();
					new_file.file_output_lines = 0;
					FILE *fp = fopen((m_path + "/" + *it + ".output").c_str(), "a");
					new_file.output_log_ptr = fp;
					m_file_pair_info_map.insert(pair(*it, new_file));
				}
			}

			return true;
		}

		bool write_input_log(const string& file_prefix, const string& uuid, const string& input_log)
		{
			string line = uuid + "\x01\x02\x03" + input_log + "
"; pthread_mutex_lock(m_mutex); if(m_file_pair_info_map.find(file_prefix) != m_file_pair_info_map.end()) { if(fputs(line.c_str(), m_input_log) < 0) { pthread_mutex_unlock(m_mutex); return false; } ++m_file_pair_info_map[file_prefix].file_input_lines; } else { pthread_mutex_unlock(m_mutex); return false; } if((++m_input_log_lines) == m_max_log_lines) { fflush(m_input_log); fclose(m_input_log); m_input_log = NULL; char file_name[1024] = {0}; snprintf(file_name, 1023, "%s/%lu_%lld.input", m_path.c_str(), m_start_time, m_log_version); m_input_log = fopen(file_name, "w"); memset(file_name, '\0', 1024); snprintf(file_name, 1023, "%s/%lu_%lld.output", m_path.c_str(), m_start_time, m_log_version); m_output_log = fopen(file_name, "w"); redo_file_pair_info new_file; new_file.file_max_lines = m_max_log_lines; new_file.file_input_lines = 0; new_file.file_output_lines = 0; new_file.output_log_ptr = m_output_log; char log_prefix[64] = {0}; sprintf(log_prefix, "%lu_%lld", m_start_time, m_log_version); m_file_pair_info_map.insert(pair(string(log_prefix), new_file)); ++m_log_version; m_current_input_prefix = string(log_prefix); m_input_log_lines = 0; } pthread_mutex_unlock(m_mutex); return true; } bool write_output_log(const string& file_prefix, const string& uuid, const string& output_log) { string line = uuid + "\x01\x02\x03" + output_log + "
"; map::iterator it; pthread_mutex_lock(m_mutex); if((it = m_file_pair_info_map.find(file_prefix)) != m_file_pair_info_map.end()) { if(fputs(line.c_str(), it->second.output_log_ptr) < 0) { pthread_mutex_lock(m_mutex); return false; } if((++it->second.file_output_lines) == it->second.file_max_lines) { fflush(it->second.output_log_ptr); fclose(it->second.output_log_ptr); } } pthread_mutex_unlock(m_mutex); return true; } bool remove_finish_redo_log() { vector delete_file_prefix; delete_file_prefix.reserve(100); pthread_mutex_lock(m_mutex); for(map::iterator it = m_file_pair_info_map.begin(); it != m_file_pair_info_map.end(); ++it) { if(it->second.file_input_lines == it->second.file_output_lines && it->second.file_output_lines == it->second.file_max_lines) { delete_file_prefix.push_back(it->first); } } pthread_mutex_unlock(m_mutex); for(size_t i = 0; i < delete_file_prefix.size(); i++) { pthread_mutex_lock(m_mutex); m_file_pair_info_map.erase(delete_file_prefix[i]); pthread_mutex_unlock(m_mutex); //delete input and output file pair string rm_command = "rm -rf " + m_path + "/" + delete_file_prefix[i] + ".*"; if(0 != system(rm_command.c_str())) return false; cout<get_rm_finish_log_time()); p->remove_finish_redo_log(); } return NULL; } private: pthread_mutex_t *m_mutex; time_t m_start_time; int64_t m_uuid; int m_input_log_lines; int m_max_log_lines; int m_rm_finish_log_time; string m_path; string m_current_input_prefix; FILE *m_input_log; FILE *m_output_log; int64_t m_log_version; map m_file_pair_info_map; }; #endif

//testプログラム
#include 
#include 
#include 
#include "redo_log.h"
using namespace std;

int main(int argc, char *argv[])
{
	redo_log a;
	if(a.init("/home/admin/learn/redo_log", 20000))
		cout< unfinish_records;
	a.reload_unfinish_records(unfinish_records);
	for(size_t i = 0; i < unfinish_records.size(); i++)
	{
		cout<

テストは2ステップ、g++test_redo_log.cpp -lpthread ;  
最初のステップ./a.out 0 ; redo logの生成と削除状況を表示します.
第2部./a.out 1; redo logリカバリの状況を確認