【6.C++ベース】-フレームワーク
thrift
ネットワークのセクションではthriftのプロトコル部分を簡単に紹介し、エンジニアリングではthriftを得るスレッドで同時、process、serverライブラリを使用します.idlを定義した生成コードとビジネス記述コードの関係は以下の通りです.
プロセスの実行:
まとめ:マルチreactorマルチスレッドモード、1つのaccept、複数の読み書き、個別タスク処理.通常はreactorが1つしか必要ありません.単一reactorマルチスレッド形式.
http_server
エレガントな再起動について
nginxというマルチプロセスは、サブプロセスが親プロセスから独立できるため、教えるよりもよくできます.メインプロセスforkは、傍受fd、ロックなどを継承し、exec()は完全なコードを実行します.このとき、古いサブプロセスと新しいサブプロセスはいずれも傍受fd処理接続をロックし、古いメインプロセスを閉鎖し、古いサブプロセスに閉鎖信号を送信することができ、サブプロセスは処理後に信号を傍受することができ、優雅になった.スレッドは独立して信号を傍受することができません.
接続プール
ここで0は使用可能です.しかし、本当にpingしないでください.そうしないと、代価が大きすぎます.readを使って、接続がEAGAINエラーを送信している場合は、接続中のaddが任意の接続オブジェクトです.接続の実装たとえば
改造されたredis_pool
接続プール+スレッドプール+hiredisは、それぞれ接続管理と同時要求処理を担当します.パッケージの目的:一般的にスライス取得データに併発するエージェントには、1つの失敗がすべて失敗し、すべての戻りを待たなければならず、mgetの失敗が拡大されるという欠点があります.したがって,mget全体のタイムアウト時間と戻りをビジネス層で制御し,エージェント層に分割してgetとし,スレッドプールで実現した.
spdlogトラフィックコール spdlogのpush_msgはenqueue です spdlogログごとにスレッドが1つずつあり、起動後にdequeueがドロップ にループする.ロックキューなし 優れている.
ネットワークのセクションではthriftのプロトコル部分を簡単に紹介し、エンジニアリングではthriftを得るスレッドで同時、process、serverライブラリを使用します.idlを定義した生成コードとビジネス記述コードの関係は以下の通りです.
プロセスの実行:
1. threaft
a. n , , workerMonitor_.wait。 100 ( , )
b. , , ,workerMonitor_.notify
monitor_.wait()
, manager_->maxMonitor_.notify(),
。 。
2. nonblockingserver,io
a. 0 start, eventbase(iothread),createpipe, notify;
event_base_loop(eventBase_, 0);【 , io event_base】
b.0 , eventbase(iothread); ;createpipe, notify。
0 io run, 。 io join
3.0 handleEvent
accept
create connection(init ) io ( ) , notifyhandler
4. IO notifyhandler(read notifyfd,transition)
transition: init setread, workSocket, transition
, addtask=>setidle, cfd
5.addtask
thrift, , tasks_.size() >= pendingTaskCountMax_,maxMonitor_.wait(timeout);
task , monitor_.notify()。 monitor 。
task process notifyIOThread(read notifyfd,transition)。
6. IO
transition cfd ,workSocket connenction 。
7.connenction notifyIOThread transition 。
まとめ:マルチreactorマルチスレッドモード、1つのaccept、複数の読み書き、個別タスク処理.通常はreactorが1つしか必要ありません.単一reactorマルチスレッド形式.
http_server
エレガントな再起動について
nginxというマルチプロセスは、サブプロセスが親プロセスから独立できるため、教えるよりもよくできます.メインプロセスforkは、傍受fd、ロックなどを継承し、exec()は完全なコードを実行します.このとき、古いサブプロセスと新しいサブプロセスはいずれも傍受fd処理接続をロックし、古いメインプロセスを閉鎖し、古いサブプロセスに閉鎖信号を送信することができ、サブプロセスは処理後に信号を傍受することができ、優雅になった.スレッドは独立して信号を傍受することができません.
接続プール
ここで0は使用可能です.しかし、本当にpingしないでください.そうしないと、代価が大きすぎます.readを使って、接続がEAGAINエラーを送信している場合は、接続中のaddが任意の接続オブジェクトです.接続の実装たとえば
for (int i = 0; i < connectionCount; ++i) {
RedisClient* redis = new RedisClient(host, port, conn_timeout_ms, rw_timeout_ms);
redis->init();//CONNECT
redisPool_.add(redis);
}
改造されたredis_pool
接続プール+スレッドプール+hiredisは、それぞれ接続管理と同時要求処理を担当します.パッケージの目的:一般的にスライス取得データに併発するエージェントには、1つの失敗がすべて失敗し、すべての戻りを待たなければならず、mgetの失敗が拡大されるという欠点があります.したがって,mget全体のタイムアウト時間と戻りをビジネス層で制御し,エージェント層に分割してgetとし,スレッドプールで実現した.
spdlog
spdlog::set_async_mode(8192*4, spdlog::async_overflow_policy::block_retry,nullptr, std::chrono::seconds(3));
std::string info_file = FLAGS_log_path + "/" + FLAGS_info_file
auto debug_logger = spdlog::basic_logger_mt("debug_logger", info_file.c_str());
debug_logger->set_pattern("INFO: %Y-%m-%d %H:%M:%S %v");
inline std::shared_ptr<:logger> spdlog::create(const std::string& logger_name, Args... args)
{
sink_ptr sink = std::make_shared(args...);
return details::registry::instance().create(logger_name, { sink });
/* new_logger = std::make_shared(logger_name, sinks_begin, sinks_end, _async_q_size, _overflow_policy, _worker_warmup_cb, _flush_interval_ms, _worker_teardown_cb); //
_loggers[logger_name] = new_logger;*/
}
auto logger = spdlog::get("warn_logger");\
if (logger != NULL) { \
logger->info("{}:{} {}", cplusutils::servbase_basename(__FILE__), __LINE__, log_info.str()); \
}
info()=>log()->push_msg()
inline void spdlog::details::async_log_helper::push_msg(details::async_log_helper::async_msg&& new_msg)
{
if (!_q.enqueue(std::move(new_msg)) && _overflow_policy != async_overflow_policy::discard_log_msg)
{
auto last_op_time = details::os::now();
auto now = last_op_time;
do
{
now = details::os::now();
sleep_or_yield(now, last_op_time);
}
while (!_q.enqueue(std::move(new_msg)));
}
}
_worker_thread(&async_log_helper::worker_loop, this)
while (active)
{
try
{
active = process_next_msg(last_pop, last_flush);
}
}
inline bool spdlog::details::async_log_helper::process_next_msg(log_clock::time_point& last_pop, log_clock::time_point& last_flush)
{
async_msg incoming_async_msg;
if (_q.dequeue(incoming_async_msg))
{
for (auto &s : _sinks)
{
if (s->should_log(incoming_log_msg.level))
{
s->log(incoming_log_msg); // 。
}
}
}
else
{
auto now = details::os::now();
handle_flush_interval(now, last_flush);
sleep_or_yield(now, last_pop);
return !_terminate_requested;
}
}
bool enqueue(T&& data)
{
cell_t* cell;
size_t pos = enqueue_pos_.load(std::memory_order_relaxed);
for (;;)
{
cell = &buffer_[pos & buffer_mask_];
size_t seq = cell->sequence_.load(std::memory_order_acquire);
intptr_t dif = static_cast(seq) - static_cast(pos);
if (dif == 0)
{
if (enqueue_pos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed))
break;
}
else if (dif < 0)
{
return false;
}
else
{
pos = enqueue_pos_.load(std::memory_order_relaxed);
}
}
cell->data_ = std::move(data);
cell->sequence_.store(pos + 1, std::memory_order_release);
return true;
}
bool dequeue(T& data)
{
cell_t* cell;
size_t pos = dequeue_pos_.load(std::memory_order_relaxed);
for (;;)
{
cell = &buffer_[pos & buffer_mask_];
size_t seq =
cell->sequence_.load(std::memory_order_acquire);
intptr_t dif = static_cast(seq) - static_cast(pos + 1);
if (dif == 0)
{
if (dequeue_pos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed))
break;
}
else if (dif < 0)
return false;
else
pos = dequeue_pos_.load(std::memory_order_relaxed);
}
data = std::move(cell->data_);
cell->sequence_.store(pos + buffer_mask_ + 1, std::memory_order_release);
return true;
}
buffer配列.各seq,dataenqueue seqフロントシフトpos+1 dequeue seqフロントシフトpos+1+maskサイクル多重memory_order_relaxed:実行順序を保証しないmemory_order_acquire:このスレッドでは、後続のすべての読み取り操作は、この原子操作が完了した後にmemory_を実行する必要があります.order_release:このスレッドでは、以前の書き込み操作が完了してから、この原子操作a.compare_を実行できます.exchange_weak(n,w):aとnを比較し、等しい場合、aはwに割り当てられる.等しくなく、nはaに割り当てられ、false buffer{sequence,data}enqueue_を返します.posの2つとcellの値に1 dequeueを加算します.posはなぜacquireとrelaxedなのでしょうか.posのCASは書く原子性を保証することができる.最低relaxed.原子を単独で操作することを保証することができて、順序を保証することができなくて、このような順序に対する制限性能はきっとロックより良いですか?これは単一命令のみに制限され、性能はロックより