boost::asio接続管理8

7060 ワード

前編になると、完全な単一スレッドバージョンが表示されます.コンカレント要求が高くなければ、シングルスレッド+非同期I/Oで十分です.
しかし,大同時性をサポートするためには,サーバ上の複数のCPUとコアを可能な限り利用する必要がある.
今はまず前の工事をcmake工事に変更します.
最上位ディレクトリのCMakeLists.txtの内容は以下の通りです.
cmake_minimum_required(VERSION 2.8)
project (TcpTemplate)
add_subdirectory(src bin)

srcディレクトリの下のCMakeLists.txtファイル構成:
cmake_minimum_required(VERSION 2.8)
set(CMAKE_BUILD_TYPE Debug)
set(PROJECT_INCLUDE_DIR ../include)

find_package(Boost COMPONENTS system filesystem thread REQUIRED)
include_directories(${Boost_INCLUDE_DIR} ${PROJECT_INCLUDE_DIR})
AUX_SOURCE_DIRECTORY(${CMAKE_SOURCE_DIR}/src CPP_LIST1)
AUX_SOURCE_DIRECTORY(${CMAKE_SOURCE_DIR}/src/core CPP_LIST2)

add_executable(service ${CPP_LIST1} ${CPP_LIST2})
target_link_libraries(service ${Boost_LIBRARIES})

add_definitions(-Wall)

はい、今mainを見てください.ccは、あまり変わっていませんが、Serverクラスのコードを剥がしただけです.
#include <iostream>
#include "core/server.h"

using namespace std;

int main(int argc,char ** argv) {
    try {
	io_service iosev;
	tcp::endpoint listen_endpoint(tcp::v4(), 8888);
        Server server(iosev, listen_endpoint, 10);
        server.Run();
    } catch(std::exception const& ex) {
      cout << "Exception: " << ex.what() << "";
    }
}

ServerクラスとConnectionクラスはcoreディレクトリの下に配置されます.注意、ここでは効率のために、Connectionsクラスがすべてのconnectionを統一的に管理することをキャンセルしました.各connectionは自分で接続を閉じるか、io_のためにservice.stopの呼び出しですべての接続を閉じます.
server.hファイルコード:
#ifndef CORE_SERVER_H_
#define CORE_SERVER_H_

#include <boost/asio.hpp>
#include "core/connection.h"

using namespace boost;
using boost::system::error_code;
using namespace boost::asio;
using ip::tcp;


// Crate a thread pool for io_service
// Run the io_service to accept new incoming TCP connection and handle the I/O events
class Server {
 public:
  Server(io_service& io_service, tcp::endpoint const& listen_endpoint, size_t threads_number);

  // Create a thread pool for io_service
  // Launch io_service
  void Run();

  void AfterAccept(shared_ptr<Connection>& connection, error_code const& ec);

 private:
  void Stop();

 private:
  io_service& io_;
  boost::asio::signal_set signals_;
  tcp::acceptor acceptor_;
  size_t thread_pool_size_;
};

#endif

server.ccファイル内容:
#include "core/server.h"
#include <boost/bind.hpp>
#include <boost/thread/thread.hpp>
#include <vector>
#include "core/connection.h"

using namespace boost;

Server::Server(io_service& s, tcp::endpoint const& listen_endpoint, size_t threads_number)
  : io_(s),
    signals_(s),
    acceptor_(io_, listen_endpoint),
    thread_pool_size_(threads_number) {
  signals_.add(SIGINT);
  signals_.add(SIGTERM);
#if defined(SIGQUIT)
  signals_.add(SIGQUIT);
#endif
  signals_.async_wait(bind(&Server::Stop, this));
  shared_ptr<Connection> c(new Connection(io_));
          
  acceptor_.async_accept(c->socket, bind(&Server::AfterAccept, this, c, _1));
    }

void Server::AfterAccept(shared_ptr<Connection>& c, error_code const& ec) {
  // Check whether the server was stopped by a signal before this completion
  // handler had a chance to run.
  if (!acceptor_.is_open()) {
    cout << "acceptor is closed" << endl;
    return;
  }
        
  if (!ec) {
    c->StartJob();
    shared_ptr<Connection> c2(new Connection(io_));
    acceptor_.async_accept(c2->socket,
			   boost::bind(&Server::AfterAccept, this, c2, _1));
  }
}

void Server::Run() {
  // Create a pool of threads to run all of the io_services.
  vector<shared_ptr<thread> > threads;
  for (size_t i = 0; i < thread_pool_size_; ++i) {
    shared_ptr<thread> t(new thread(bind(&io_service::run, &io_)));
    threads.push_back(t);
  }

  // Wait for all threads in the pool to exit.
  for (std::size_t i = 0; i < threads.size(); ++i) {
    threads[i]->join();
  }
}

void Server::Stop() {
  cout << "stopping" << endl;
  acceptor_.close();
  io_.stop();
}

Runの実装は複雑になり,スレッドプールの作成に変更され,スレッドはio_にバインドされた.Service::runは、io_までスレッドがこの関数を実行することを意味します.サービスは閉じられているので終了します.
その後、すべてのスレッドが終了するまでメインスレッドは待機し、自分で終了します.
Stop関数はConnectionsクラスがないため簡略化されました.
Connectionクラスの実装を見てみましょうhファイル内容:
#ifndef CORE_CONNECTION_H_
#define	CORE_CONNECTION_H_

#include <set>
#include <algorithm>
#include <vector>
#include <boost/asio.hpp>
#include <boost/enable_shared_from_this.hpp>

using namespace boost::asio;
using ip::tcp;
using boost::system::error_code;

using namespace boost;
using namespace std;

class Connection: public boost::enable_shared_from_this<Connection> {
public:
    Connection(io_service& s);
    
    ~Connection();
    
    void StartJob();
    
    void CloseSocket();
    
    void AfterReadChar(error_code const& ec);
    
public:
    tcp::socket socket;
    
private:
    vector<char> read_buffer_;
    /// Strand to ensure the connection's handlers are not called concurrently.
    boost::asio::io_service::strand strand_;
};


#endif

ここで重要なのはstrand_メンバー.説明します.
さっきserverはスレッドプールを作成し、プール内の各スレッドがio_を呼び出しました.サービス::runメソッド.boost::asioの原則に従って、これらのスレッドはconnectionの非同期I/O処理関数、つまり私のAfterを呼び出す平等な機会を持っています.で行ないます.これはマルチスレッド環境であるため、connectionの有効な非同期I/O処理関数(1つまたは複数)が同時にいくつかのスレッドによって呼び出される可能性があります.状態の不一致を防ぐためにstrandは約束を提供して、もし私たちのbindがそれに渡して一度パッケージを打って、io_に伝えますサービスでは、これらのコールバックは、同じ時点でスレッド呼び出しが1つしかないことを保証します.つまり、ロープ(strandオブジェクト)で時間順に直列に接続されます.
connection.ccコード:
#include "core/connection.h"
#include <boost/bind.hpp>

Connection::Connection(io_service& s)
  : socket(s), read_buffer_(1, 0), strand_(s) {

}

Connection::~Connection() {
    cout << "~Connection" << endl;
}

void Connection::StartJob() {
    cout << "the new connection object is starting now." << endl;
    async_read(socket, buffer(read_buffer_),
	       strand_.wrap(bind(&Connection::AfterReadChar, shared_from_this(), _1)));
}

void Connection::CloseSocket() {
    cout << "closing the socket" << endl;
    socket.shutdown(tcp::socket::shutdown_both);
    socket.close();
}

void Connection::AfterReadChar(error_code const& ec) {
    if (ec) {
        cout << ec.message() << endl;
        return;
    }

    char x = read_buffer_[0];
    if (x == 'a') {
        cout << "correct data received" << endl;
        async_read(socket, buffer(read_buffer_),
		   strand_.wrap(bind(&Connection::AfterReadChar, shared_from_this(), _1)));
    } else {
        cout << "wrong data received, char is:" << (int) x << endl;
        CloseSocket();
    }
}

はい、スレッドプールの同時化が実現され、単一Connectionオブジェクトの呼び出しの線形化が保証されています.