【6.C++ベース】-フレームワーク


thrift
ネットワークのセクションではthriftのプロトコル部分を簡単に紹介し、エンジニアリングではthriftを得るスレッドで同時、process、serverライブラリを使用します.idlを定義した生成コードとビジネス記述コードの関係は以下の通りです.
プロセスの実行:
1.  threaft   
    a.     n ,  ,    workerMonitor_.wait。 100    (  ,    )
    b.       ,  ,    ,workerMonitor_.notify
         monitor_.wait()
           ,             manager_->maxMonitor_.notify(),
         。    。         
2.  nonblockingserver,io    
    a. 0      start,  eventbase(iothread),createpipe,  notify;
      event_base_loop(eventBase_, 0);【   ,  io     event_base】
    b.0       ,  eventbase(iothread);    ;createpipe,  notify。
      0 io  run,    。  io  join
3.0    handleEvent
    accept
      create connection(init  )     io  (  )   ,       notifyhandler
4.      IO  notifyhandler(read notifyfd,transition)
      transition: init     setread,   workSocket,      transition
             ,  addtask=>setidle,     cfd
5.addtask
    thrift,  ,  tasks_.size() >= pendingTaskCountMax_,maxMonitor_.wait(timeout);
      task  ,     monitor_.notify()。    monitor      。
       task  process  notifyIOThread(read notifyfd,transition)。
6.     IO  
    transition cfd       ,workSocket  connenction     。
7.connenction         notifyIOThread   transition       。

まとめ:マルチreactorマルチスレッドモード、1つのaccept、複数の読み書き、個別タスク処理.通常はreactorが1つしか必要ありません.単一reactorマルチスレッド形式.
http_server
エレガントな再起動について
nginxというマルチプロセスは、サブプロセスが親プロセスから独立できるため、教えるよりもよくできます.メインプロセスforkは、傍受fd、ロックなどを継承し、exec()は完全なコードを実行します.このとき、古いサブプロセスと新しいサブプロセスはいずれも傍受fd処理接続をロックし、古いメインプロセスを閉鎖し、古いサブプロセスに閉鎖信号を送信することができ、サブプロセスは処理後に信号を傍受することができ、優雅になった.スレッドは独立して信号を傍受することができません.
接続プール
ここで0は使用可能です.しかし、本当にpingしないでください.そうしないと、代価が大きすぎます.readを使って、接続がEAGAINエラーを送信している場合は、接続中のaddが任意の接続オブジェクトです.接続の実装たとえば
    for (int i = 0; i < connectionCount; ++i) {
        RedisClient* redis = new RedisClient(host, port, conn_timeout_ms, rw_timeout_ms);
        redis->init();//CONNECT
        redisPool_.add(redis);
    }

改造されたredis_pool
接続プール+スレッドプール+hiredisは、それぞれ接続管理と同時要求処理を担当します.パッケージの目的:一般的にスライス取得データに併発するエージェントには、1つの失敗がすべて失敗し、すべての戻りを待たなければならず、mgetの失敗が拡大されるという欠点があります.したがって,mget全体のタイムアウト時間と戻りをビジネス層で制御し,エージェント層に分割してgetとし,スレッドプールで実現した.
spdlog
  • トラフィックコール
    spdlog::set_async_mode(8192*4, spdlog::async_overflow_policy::block_retry,nullptr, std::chrono::seconds(3));
    std::string info_file =  FLAGS_log_path + "/" + FLAGS_info_file
    auto debug_logger = spdlog::basic_logger_mt("debug_logger", info_file.c_str());
    debug_logger->set_pattern("INFO: %Y-%m-%d %H:%M:%S %v");
    
    
    inline std::shared_ptr<:logger> spdlog::create(const std::string& logger_name, Args... args)
    {
        sink_ptr sink = std::make_shared(args...);
        return details::registry::instance().create(logger_name, { sink });    
        /*     new_logger = std::make_shared(logger_name, sinks_begin, sinks_end, _async_q_size, _overflow_policy, _worker_warmup_cb, _flush_interval_ms, _worker_teardown_cb);    //     
        _loggers[logger_name] = new_logger;*/
    }
    
     auto logger = spdlog::get("warn_logger");\
               if (logger != NULL) { \
                   logger->info("{}:{} {}", cplusutils::servbase_basename(__FILE__), __LINE__, log_info.str()); \
               }
    
    info()=>log()->push_msg()
    
  • spdlogのpush_msgはenqueue
    inline void spdlog::details::async_log_helper::push_msg(details::async_log_helper::async_msg&& new_msg)
    {
        if (!_q.enqueue(std::move(new_msg)) && _overflow_policy != async_overflow_policy::discard_log_msg)
        {
            auto last_op_time = details::os::now();
            auto now = last_op_time;
            do
            {
                now = details::os::now();
                sleep_or_yield(now, last_op_time);
            }
            while (!_q.enqueue(std::move(new_msg)));
        }
    }
  • です
  • spdlogログごとにスレッドが1つずつあり、起動後にdequeueがドロップ
    _worker_thread(&async_log_helper::worker_loop, this)
    while (active)
        {
            try
            {
                active = process_next_msg(last_pop, last_flush);
            }
        }
    
    
        inline bool spdlog::details::async_log_helper::process_next_msg(log_clock::time_point& last_pop, log_clock::time_point& last_flush)
    {
        async_msg incoming_async_msg;
    
        if (_q.dequeue(incoming_async_msg))
        {
                for (auto &s : _sinks)
                {
                    if (s->should_log(incoming_log_msg.level))
                    {
                        s->log(incoming_log_msg);   //         。
                    }
                }
           
        }
        else
        {
            auto now = details::os::now();
            handle_flush_interval(now, last_flush);
            sleep_or_yield(now, last_pop);
            return !_terminate_requested;
        }
    }
    
  • にループする.
  • ロックキューなし
    bool enqueue(T&& data)
        {
            cell_t* cell;
            size_t pos = enqueue_pos_.load(std::memory_order_relaxed);
            for (;;)
            {
                cell = &buffer_[pos & buffer_mask_];
                size_t seq = cell->sequence_.load(std::memory_order_acquire);
                intptr_t dif = static_cast(seq) - static_cast(pos);
                if (dif == 0)
                {
                    if (enqueue_pos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed))
                        break;
                }
                else if (dif < 0)
                {
                    return false;
                }
                else
                {
                    pos = enqueue_pos_.load(std::memory_order_relaxed);
                }
            }
            cell->data_ = std::move(data);
            cell->sequence_.store(pos + 1, std::memory_order_release);
            return true;
        }
    
        bool dequeue(T& data)
        {
            cell_t* cell;
            size_t pos = dequeue_pos_.load(std::memory_order_relaxed);
            for (;;)
            {
                cell = &buffer_[pos & buffer_mask_];
                size_t seq =
                    cell->sequence_.load(std::memory_order_acquire);
                intptr_t dif = static_cast(seq) - static_cast(pos + 1);
                if (dif == 0)
                {
                    if (dequeue_pos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed))
                        break;
                }
                else if (dif < 0)
                    return false;
                else
                    pos = dequeue_pos_.load(std::memory_order_relaxed);
            }
            data = std::move(cell->data_);
            cell->sequence_.store(pos + buffer_mask_ + 1, std::memory_order_release);
            return true;
        }
    
    buffer配列.各seq,dataenqueue seqフロントシフトpos+1 dequeue seqフロントシフトpos+1+maskサイクル多重memory_order_relaxed:実行順序を保証しないmemory_order_acquire:このスレッドでは、後続のすべての読み取り操作は、この原子操作が完了した後にmemory_を実行する必要があります.order_release:このスレッドでは、以前の書き込み操作が完了してから、この原子操作a.compare_を実行できます.exchange_weak(n,w):aとnを比較し、等しい場合、aはwに割り当てられる.等しくなく、nはaに割り当てられ、false buffer{sequence,data}enqueue_を返します.posの2つとcellの値に1 dequeueを加算します.posはなぜacquireとrelaxedなのでしょうか.posのCASは書く原子性を保証することができる.最低relaxed.原子を単独で操作することを保証することができて、順序を保証することができなくて、このような順序に対する制限性能はきっとロックより良いですか?これは単一命令のみに制限され、性能はロックより
  • 優れている.