mysqlの接続処理手順

15928 ワード

mysqld_main関数で一連の初期化が行われるとmysqlはクライアントの接続の傍受を開始する
mysqld_socket_acceptor->connection_event_loop();

mysqld_の表示socket_acceptor:
Connection_acceptor *mysqld_socket_acceptor= NULL;

これはクラステンプレート接続です.アクセプターがMysqldを通過socket_Listenerクラスのインスタンス化、次はConnection_acceptorの定義
template  class Connection_acceptor
{
  Listener *m_listener;

public:
  Connection_acceptor(Listener *listener)
  : m_listener(listener)
  { }

  ~Connection_acceptor()
  {
    delete m_listener;
  }

  /**
    Initialize a connection acceptor.

    @retval   return true if initialization failed, else false.
  */
  bool init_connection_acceptor()
  {
    return m_listener->setup_listener();
  }

  /**
    Connection acceptor loop to accept connections from clients.
  */
  void connection_event_loop()
  {
    Connection_handler_manager *mgr= Connection_handler_manager::get_instance();
    while (!abort_loop)
    {
      Channel_info *channel_info= m_listener->listen_for_connection_event();
      if (channel_info != NULL)
        mgr->process_new_connection(channel_info);
    }
  }

  /**
    Close the listener.
  */
  void close_listener()
  {
    m_listener->close_listener();
  }

};

このクラスは抽象的なインタフェースであり、リスニングソケットの操作をカプセル化していることがわかります.connection_event_loop関数は,リスニングされたソケット上の接続をループして読み出し,接続割り当てを行う関数である.m_Listenerは、実際に接続を取得したクラスMysqld_を指すポインタです.socket_Listenerのインスタンス、メンバー関数listen_for_connection_event()は、異なる測定オペレーティングシステムの状況に応じて、例えばlinuxでpoll関数を使用して動作するように、ソケットとの傍受動作をカプセル化する.  Connection_handler_managerは、取得した新しい接続がどのように処理されるか、各接続のスレッドを使用するか、または他のモードを使用するかを管理するグローバルな単一のモードです.このクラスの新規接続に対するいくつかのキーメンバーと関数を一時的にリストします.
class Connection_handler_manager
{
    static Connection_handler_manager* m_instance;
    Connection_handler* m_connection_handler;
    static mysql_mutex_t LOCK_connection_count;
    static mysql_cond_t COND_connection_count;

public:
    static uint connection_count;          // Protected by LOCK_connection_count
    static ulong max_used_connections;     // Protected by LOCK_connection_count
    static ulong max_used_connections_time;// Protected by LOCK_connection_count
    
    //             
    static Connection_handler_manager* get_instance()
    {
        DBUG_ASSERT(m_instance != NULL);
        return m_instance;
    }
    
    //     
    void process_new_connection(Channel_info* channel_info);
    bool check_and_incr_conn_count();
}

  m_connection_handlerは特定の接続処理者であり、そのタイプはConnection_handlerは虚ベースクラスであり、様々な接続方式がこのクラスを継承し、接続をどのように処理するかを具体的に実現し、process_new_Connection関数は、新しい接続を受信して処理する関数です.
Connection_handler_manager::process_new_connection(Channel_info* channel_info)
{
  if (abort_loop || !check_and_incr_conn_count())
  {
    channel_info->send_error_and_close_channel(ER_CON_COUNT_ERROR, 0, true);
    delete channel_info;
    return;
  }

  if (m_connection_handler->add_connection(channel_info))
  {
    inc_aborted_connects();
    delete channel_info;
  }
}

この関数はabort_に頼るloopは監視を停止するかどうかを判断し、これはmysqldに定義されている.ccのvolatileグローバル変数は、他のスレッドによって変更される可能性があります.check_and_incr_conn_count関数は接続数を増やすために使用され、接続数が最大接続数より大きい場合、新しい接続に失敗します.しかし、接続数が最大接続数に等しい場合でも、rootユーザー管理のために使用される接続の再確立が許可され、接続後に検証されます.したがって,実際の最大接続数は,最大接続数+1である.ループが停止せず、接続数が満たされていない場合は、m_が呼び出されます.connection_handlerのadd_接続を追加するには、ここでfalseを返すのが正常ですか?と不思議に思っています.ここでは典型的なマルチステートです.具体的な処理はConnectionの継承によって決まります.handlerのクラス.ここでは、接続ごとにスレッドごとに実装されるPer_thread_connection_handlerクラスは次のとおりです.
//sql/conn_handler/connection_handler_impl.h
    class Per_thread_connection_handler : public Connection_handler
    {
      Per_thread_connection_handler(const Per_thread_connection_handler&);
      Per_thread_connection_handler&
        operator=(const Per_thread_connection_handler&);
    
      /**
        Check if idle threads to handle connection in
        thread cache. If so enqueue the new connection
        to be picked by the idle thread in thread cache.
    
        @retval false if idle pthread was found, else true.
      */
      bool check_idle_thread_and_enqueue_connection(Channel_info* channel_info);
    
      /**
        List of pending channel info objects to be picked by idle
        threads. Protected by LOCK_thread_cache.
      */
      static std::list *waiting_channel_info_list;
    
      static mysql_mutex_t LOCK_thread_cache;
      static mysql_cond_t COND_thread_cache;
      static mysql_cond_t COND_flush_thread_cache;
    
    public:
      // Status variables related to Per_thread_connection_handler
      static ulong blocked_pthread_count;    // Protected by LOCK_thread_cache.
      static ulong slow_launch_threads;
      // System variable
      static ulong max_blocked_pthreads;
    
      static void init();
      static void destroy();
    
      /**
        Wake blocked pthreads and wait until they have terminated.
      */
      static void kill_blocked_pthreads();
    
      /**
        Block until a new connection arrives.
      */
      static Channel_info* block_until_new_connection();
    
      Per_thread_connection_handler() {}
      virtual ~Per_thread_connection_handler() { }
    
    protected:
      virtual bool add_connection(Channel_info* channel_info);
    
      virtual uint get_max_threads() const;
    };

このクラスはone thread per connectionのモードで接続とスレッドを管理するために使用され、ここで詳細に検討することができる.まず、mysqlのone thread per connectionモードでは、接続ごとにスレッドが1つ占有され、mysqlはクラスのmax_から最大数のスレッドの一部を再利用するためにキャッシュします.blocked_pthreads制御(この変数はmysqld.ccのinit_common_variablesでユーザ設定に従って初期化される)channel_info_Listでは消費される要素で、ここでは接続情報を保存するchannel_infoのポインタ.スレッドが自分のタスクを処理するとblock_を呼び出すuntil_new_接続関数:
Channel_info* Per_thread_connection_handler::block_until_new_connection()
{
  Channel_info *new_conn= NULL;
  mysql_mutex_lock(&LOCK_thread_cache);
  if (blocked_pthread_count < max_blocked_pthreads &&
      !kill_blocked_pthreads_flag)
  {
    /* Don't kill the pthread, just block it for reuse */
    DBUG_PRINT("info", ("Blocking pthread for reuse"));

    /*
      mysys_var is bound to the physical thread,
      so make sure mysys_var->dbug is reset to a clean state
      before picking another session in the thread cache.
    */
    DBUG_POP();
    DBUG_ASSERT( ! _db_is_pushed_());

    // Block pthread
    blocked_pthread_count++;
    while (!abort_loop && !wake_pthread && !kill_blocked_pthreads_flag)
      mysql_cond_wait(&COND_thread_cache, &LOCK_thread_cache);
    blocked_pthread_count--;

    if (kill_blocked_pthreads_flag)
      mysql_cond_signal(&COND_flush_thread_cache);
    else if (!abort_loop && wake_pthread)
    {
      wake_pthread--;
      DBUG_ASSERT(!waiting_channel_info_list->empty());
      new_conn= waiting_channel_info_list->front();
      waiting_channel_info_list->pop_front();
      DBUG_PRINT("info", ("waiting_channel_info_list->pop %p", new_conn));
    }
  }
  mysql_mutex_unlock(&LOCK_thread_cache);
  return new_conn;
}

最外層のif文から、キャッシュするためにブロックされたスレッドの数が最大値に達しず、kill_blocked_pthreads_flagフラグが設定されていない場合(閉じると、ブロックされたすべてのスレッドを終了するためにフラグが設定されます)、スレッドは条件変数COND_にブロックされます.thread_Cache上:
while (!abort_loop && !wake_pthread && !kill_blocked_pthreads_flag)
      mysql_cond_wait(&COND_thread_cache, &LOCK_thread_cache);

  新しい接続が来ると、キャッシュされたスレッドがあるかどうかをチェックし、ある場合はCOND_を使用します.thread_Cache起動は上のスレッドにブロックされており、起動の原因が新しい接続の到来であると、起動されたスレッドはキューから処理対象の接続を取り出し、この接続に戻って処理を行うことがコードから分かる.
else if (!abort_loop && wake_pthread)
{
    wake_pthread--;
    DBUG_ASSERT(!waiting_channel_info_list->empty());
    new_conn= waiting_channel_info_list->front();
    waiting_channel_info_list->pop_front();
    DBUG_PRINT("info", ("waiting_channel_info_list->pop %p", new_conn));
    }

次に、新しい接続を追加する方法を見てみましょう.名前から、新しい接続を追加する関数はadd_であることがわかります.connection、この関数は虚ベースクラスから継承されています.handlerの虚関数、接続管理クラスConnection_handler_managerのprocess_new_接続関数呼び出しadd_接続を実際に追加するには、この関数の主なコードを見てみましょう.
bool Per_thread_connection_handler::add_connection(Channel_info* channel_info)
{
  int error= 0;
  my_thread_handle id;
  //      
  ···
  if (!check_idle_thread_and_enqueue_connection(channel_info))
    DBUG_RETURN(false);

  /*
    There are no idle threads avaliable to take up the new
    connection. Create a new thread to handle the connection
  */
  channel_info->set_prior_thr_create_utime();
  error= mysql_thread_create(key_thread_one_connection, &id,
                             &connection_attrib,
                             handle_connection,
                             (void*) channel_info);
#ifndef DBUG_OFF
handle_error:
#endif // !DBUG_OFF

  if (error)
  {
    //    
    ···
  }

  Global_THD_manager::get_instance()->inc_thread_created();
  DBUG_PRINT("info",("Thread created"));
  DBUG_RETURN(false);
}

  コードを簡略化した後、論理は比較的にはっきりしていて、プログラムは先にcheck_を呼び出しますidle_thread_and_enqueue_Connectionは、キャッシュされたスレッドが使用可能かどうかを確認し、ある場合はchannelinfoをキューに直接挿入し、ブロックされたスレッドを起動し、ない場合は接続処理されたスレッドを新規作成します.check_idle_thread_and_enqueue_接続コードは次のとおりです.
bool Per_thread_connection_handler::check_idle_thread_and_enqueue_connection(Channel_info* channel_info)
{
  bool res= true;

  mysql_mutex_lock(&LOCK_thread_cache);
  if (Per_thread_connection_handler::blocked_pthread_count > wake_pthread)
  {
    DBUG_PRINT("info",("waiting_channel_info_list->push %p", channel_info));
    waiting_channel_info_list->push_back(channel_info);
    wake_pthread++;
    mysql_cond_signal(&COND_thread_cache);
    res= false;
  }
  mysql_mutex_unlock(&LOCK_thread_cache);

  return res;
}

  この関数は比較的簡単で、空きスレッドがある場合、処理対象の接続をキューに追加し、条件変数に信号を送信し、1つのスレッドを起動して処理を行う.ここでpthreadの意味は、接続を取得するためにどれだけのスレッドが必要かと考えられます.wake_pthreadは増加するとブロックされたスレッドを呼び出し、接続を取得するとwake_pthreadはさらに自滅するのでblocked_pthread_count > wake_pthreadの場合、空のスレッドがあります.そうしないと、新しいスレッドが必要です.
error= mysql_thread_create(key_thread_one_connection, &id,
                             &connection_attrib,
                             handle_connection,
                             (void*) channel_info);

  ここが接続を具体的に処理する入口で、以下は簡略化されたhandle_接続関数:
extern "C" void *handle_connection(void *arg)
{
  Global_THD_manager *thd_manager= Global_THD_manager::get_instance();
  Connection_handler_manager *handler_manager=
    Connection_handler_manager::get_instance();
  Channel_info* channel_info= static_cast(arg);
  bool pthread_reused MY_ATTRIBUTE((unused))= false;

  if (my_thread_init())
  {
    //     PS:mysql            ,       false    ,true    
    ···
  }
  
  for (;;)
  {
    THD *thd= init_new_thd(channel_info);
    if (thd == NULL)
    {
      //    
      break; // We are out of resources, no sense in continuing.
    }
    ···
    thd_manager->add_thd(thd);

    if (thd_prepare_connection(thd))
      handler_manager->inc_aborted_connects();
    else
    {//      ,      
      while (thd_connection_alive(thd))
      {
        if (do_command(thd))
          break;
      }
      end_connection(thd);
    }
    //           
    close_connection(thd, 0, false, false);
    thd->get_stmt_da()->reset_diagnostics_area();
    thd->release_resources();
    ERR_remove_state(0);
    thd_manager->remove_thd(thd);
    Connection_handler_manager::dec_connection_count();
    delete thd;

    if (abort_loop) // Server is shutting down so end the pthread.
      break;
    //          (                 )
    channel_info=Per_thread_connection_handler::block_until_new_connection();
    if (channel_info == NULL)
      break;
    pthread_reused= true;
  }
  //    
  my_thread_end();
  my_thread_exit(0);
  return NULL;
}

ここでは、スレッド管理の単一クラスGlobal_を最初に取得します.THD_manager、および接続された管理クラスConnection_handler_manager.Global_THD_managerは各スレッドのTHD構造体を1つのチェーンテーブルに直列に接続して統一的に管理し,ここでは先に展開しない.各接続スレッドごとのモードを使用する場合、1つのTHD構造は1つのスレッドに対応するが、必ずしもそうではない.例えば、スレッドプールを使用するとそうではない.混同しないために、後に称されるスレッドは、実際には処理を開始する接続を指し、スレッド構造体は、対応するTHD構造体である.真のスレッドを指すには、実際のスレッドまたは物理スレッドの言い方が使用されます.  はその後スレッド初期化を行い,主にDEBUG用のいくつかのスレッド局所変数を初期化した.その後のforループはスレッドの主な論理であり,このスレッドの構造体THDが最初に作成され,この構造体はスレッドのコンテキスト情報を保存し,非常に重要であり,しばしばいくつかの関数のパラメータとして用いられる.例えば、THDがクライアント接続に対応する場合、THD構造体には接続のすべての情報、権限、アカウント、トランザクションステータスなどが含まれています.その後、このTHDはGlobal_に追加されました.THD_managerが管理するチェーンテーブル.  接続の検証(権限検証など)を行った後、ループに入ります.
while (thd_connection_alive(thd))
{
    if (do_command(thd))
        break;
}
end_connection(thd);

 ここでは、接続されたコマンドの処理であり、接続されたコマンドをループして受信し、killedまたは実行によって何らかのエラーが発生してから終了するまで実行する.ループを終了した後、接続の後始末を行い、前述のようにPer_を呼び出す.thread_connection_handler::block_until_new_Connection()は、次の接続で使用するためにスレッドをキャッシュしようとします.最後に、スレッドが終了すると、いくつかのクリーンアップ作業が行われます.  do_command関数とは,クライアントコマンドを実行する関数を受信し,ネットワークプロトコルやエラー処理などを簡略化した後,その主な論理を得るコードである.
//sql/sql_parse.cc
bool do_command(THD *thd)
{
  NET *net= NULL;
  enum enum_server_command command;
  COM_DATA com_data;
  rc= thd->get_protocol()->get_command(&com_data, &command);
  return_value= dispatch_command(thd, &com_data, command);
}

  は、ループごとにネットワークから命令データとタイプを読み出し、dispatch_commandは具体的なコマンド実行を行い、接続が新しく作成されたり、要求がなかったりすると、スレッドがブロックされます.これでmysqlのコマンド配布プロセスに入ります.dispatch_command関数体は非常に長く、ここでは主要な論理の一部しか残っていません.
//sql/sql_parse.cc
bool dispatch_command(THD *thd, const COM_DATA *com_data,
                      enum enum_server_command command)
{
  bool error= 0;
  Global_THD_manager *thd_manager= Global_THD_manager::get_instance();
  thd->set_command(command);
  thd->enable_slow_log= TRUE;
  thd->lex->sql_command= SQLCOM_END; /* to avoid confusing VIEW detectors */
  thd->set_time();
  
  thd_manager->inc_thread_running();
  switch (command) {
  case COM_INIT_DB:
  ···
  case COM_REGISTER_SLAVE:
  ···
  ···
  case COM_QUERY:
  {
     ···
     if (alloc_query(thd, com_data->com_query.query,
                    com_data->com_query.length))
        break;
     ···
     mysql_parse(thd, &parser_state);
     ···
  }
}

この関数の主な論理は、コマンドのタイプに応じて異なる処理を行う巨大な無覇switch case文です.ここで注意COM_QUERY、彼の名前は「QUERY」ですが、クエリーを処理しているように見えますが、実際にはデータベースへのアクセス操作はすべてここを通じて行われています.よくあるDDL、DML文などです.コードを簡略化した論理は次のとおりです.
//sql/sql_pasrse.cc
void mysql_parse(THD *thd, Parser_state *parser_state)
{
//         
  mysql_reset_thd_for_next_command(thd);
  lex_start(thd);
  
  if (query_cache.send_result_to_client(thd, thd->query()) <= 0)
  {
    ···
    err= parse_sql(thd, parser_state, NULL);
    ···
    error= mysql_execute_command(thd, true);
    ···
  }
  ···
}

  ここには主要な入り口しかリストされていませんが、その他の詳細はしばらく分かりません....この関数は、クエリーの文がクエリー・キャッシュ内にあるかどうかを判断し(完全に同じ文のみが一致し、スペースや大文字と小文字も含まれる)、クライアントに直接送信されないと、文を構文解析し、実際に実行します.実行関数はmysql_execute_command
int
mysql_execute_command(THD *thd, bool first_level)
{
  //         
  LEX  *const lex= thd->lex;
  //       
  TABLE_LIST *all_tables;
  all_tables= lex->query_tables;
  ···
    switch (lex->sql_command) 
    {

      case SQLCOM_SHOW_STATUS:
      ···
      case SQLCOM_INSERT:
      case SQLCOM_REPLACE_SELECT:
      case SQLCOM_INSERT_SELECT:
      {
        DBUG_ASSERT(first_table == all_tables && first_table != 0);
        DBUG_ASSERT(lex->m_sql_cmd != NULL);
        res= lex->m_sql_cmd->execute(thd);
        break;
      }
      ···
      case SQLCOM_SHOW_PROCESSLIST:
      ···
    }
}

ここでは最終的に各文の具体的な操作の入り口であり、これはまた巨大なswitch case文であり、その中の各caseは私たちがよく知っている命令であり、その中の各命令の実行については、もう深くなく、ここで接続処理の追跡が完了した.