Nginxイベント処理(epoll)


イベント処理はNginx処理要求の核心であり、各サブプロセスはngx_にある.worker_process_cycle()のループでngx_が呼び出され続けますprocess_events_and_timers()関数は、様々なイベントを処理します.次に,epollメカニズム(Linuxで最もよく用いられる大同時イベントトリガメカニズム)を用いたNginxイベント処理のプロセスを解析し,ソースコード解析とdebug情報追跡の2つの方法を用いる.
私たちはngxからworker_process_cycle()関数(ワークプロセス処理要求のループ)切り込み:
static void ngx_worker_process_cycle(ngx_cycle_t *cycle, void *data)
{
	/*...*/
	//    :   
	ngx_worker_process_init(cycle, 1);
	/*...*/
	for ( ;; ) {
		/*...*/
		//    :    
		ngx_process_events_and_timers(cycle);
		/*...*/
	}
	/*...*/
}
 
//    :   
static void ngx_worker_process_init(ngx_cycle_t *cycle, ngx_uint_t priority)
{
	//        
	//...
	//  uid,groupid 
	//...
	//     CPU affinity
	//...
	//          
	//...
	//       
	//...
	//    socket      
	//...

	//       init_process    
	//    -->                
	//init_process
	for (i = 0; ngx_modules[i]; i++) {
		if (ngx_modules[i]->init_process) {
			//   event module: ngx_event_process_init()   
			if (ngx_modules[i]->init_process(cycle) == NGX_ERROR) {
				exit(2);
			}
		}
	}
	//      channel[1]  ,     
	//          ngx_processes  ,          channel[1]
	//...
	//    channel[0]  
	//     channel[0]       ,       sendmsg
	//...
	//  ngx_add_channel_event()  , ngx_channel           。
	// ngx_start_worker_processes()   ,ngx_channel = ngx_processes[s].channel[1];
	//ngx_channel       channel[1],     socket
	//ngx_channel_handler   channel      ,      ,      
	if (ngx_add_channel_event(cycle, ngx_channel, NGX_READ_EVENT, ngx_channel_handler) == NGX_ERROR)
	{
		exit(2);
	}
}
 
各モジュールにはグローバルなngx_があります.module_t構造変数は、worker processが作成された後、ngx_worker_process_Init()内で各モジュールに呼び出されるinit_プロセスフック.ここでngx_event_core_module
init_プロセスフックはngx_を指しますevent_process_Init()関数、この関数は次のとおりです.
static ngx_int_t ngx_event_process_init(ngx_cycle_t *cycle)
{
	/*...*/

	for (m = 0; ngx_modules[m]; m++) {
		//  ngx_event_core_module ngx_epoll_module NGX_EVENT_MODULE   
		if (ngx_modules[m]->type != NGX_EVENT_MODULE) {
			continue;
		}
		if (ngx_modules[m]->ctx_index != ecf->use) {
			continue;
		}
		//       
		module = ngx_modules[m]->ctx;
		//     
		//ngx_epoll_module(  ngx_module_t)        ,        ngx_epoll_module_ctx    , init          
		// epoll  ngx_epoll_init
		if (module->actions.init(cycle, ngx_timer_resolution) != NGX_OK) {
			exit(2);
		}
		break;
	}

	/*...*/

	/*=========   connections=========*/

	//  connection_n    connections
	cycle->connections = ngx_alloc(sizeof(ngx_connection_t) * cycle->connection_n, cycle->log);
	c = cycle->connections;
	//  connection_n    read_events write_events,     read_events write_events
	cycle->read_events = ngx_alloc(sizeof(ngx_event_t) * cycle->connection_n, cycle->log);
	rev = cycle->read_events;
	for (i = 0; i < cycle->connection_n; i++) {
		rev[i].closed = 1;
		rev[i].instance = 1;
		/*     */
	}
	cycle->write_events = ngx_alloc(sizeof(ngx_event_t) * cycle->connection_n, cycle->log);
	wev = cycle->write_events;
	for (i = 0; i < cycle->connection_n; i++) {
		wev[i].closed = 1;
		/*     */
	}

	// connection read_event,write_event    ,  connection     read_event write_event
	i = cycle->connection_n;
	next = NULL;
	do {
		// connection    
		i--;
		c[i].data = next;
		c[i].read = &cycle->read_events[i];
		c[i].write = &cycle->write_events[i];
		c[i].fd = (ngx_socket_t) -1;
		next = &c[i];
		/*     */
	} while (i);

	//free_connections  connections  
	cycle->free_connections = next;
	//   free connection   
	cycle->free_connection_n = cycle->connection_n;

	/* for each listening socket */
	ls = cycle->listening.elts;
	for (i = 0; i < cycle->listening.nelts; i++) {
		c = ngx_get_connection(ls[i].fd, cycle->log);//       connection,    file descriptor
		c->log = &ls[i].log;
		c->listening = &ls[i];//  ngx_listening_s  
		ls[i].connection = c; //  ngx_listening_s.connection
		rev = c->read;
		rev->log = c->log;
		rev->accept = 1;      //    
	
		/*...*/

		//  c->read handler ngx_event_accept
		rev->handler = ngx_event_accept;
		if (ngx_use_accept_mutex) {
			continue;
		}
		if (ngx_event_flags & NGX_USE_RTSIG_EVENT) {
			if (ngx_add_conn(c) == NGX_ERROR) {
				return NGX_ERROR;
			}
		} 
		else {
			if (ngx_add_event(rev, NGX_READ_EVENT, 0) == NGX_ERROR) {
				return NGX_ERROR;
			}
		}
	}
}
 
上の関数でepollを使用すると、epollモジュールのngx_epoll_Init()関数は呼び出されますが、この関数の中で最も重要なのは
ngx_event_actions = ngx_epoll_module_ctx.actions
 
ワークプロセスのforループでイベントを処理するために使用されるngx_process_events_and_timers()では、ngx_が呼び出されるたびにprocess_イベント()の場合、実は呼び出しです
ngx_epoll_module_ctx.Actionsの中のngx_epoll_process_events().
//    :    
//     
//     (      ),         (  )
//  accept queue event queue     
void ngx_process_events_and_timers(ngx_cycle_t *cycle)
{
    	
	//     accept mutex  
	//nginx uses accept mutex to serialize accept() syscalls
	//       mutex
	if (ngx_use_accept_mutex) {
		//       
		if (ngx_accept_disabled > 0) {
			ngx_accept_disabled--;
		}
		else {
			//  ngx_trylock_accept_mutex()   cycle  ,  ngx_accept_mutex_held  1
			//      lock,   ngx_enable_accept_events()
			//ngx_enable_accept_events()    ngx_add_event()/ngx_add_conn()
			// ,  lock worker process      “    ”   
			if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) {
				return;
			}
			//ngx_accept_mutex_held           
			//      ,  flags  NGX_POST_EVENTS,       accept  
			//     ,        , ngx_epoll_process_events    
			//rev/wev->handler()
			if (ngx_accept_mutex_held) {
				flags |= NGX_POST_EVENTS;
			}
			else {
				if (timer == NGX_TIMER_INFINITE || 
				    timer > ngx_accept_mutex_delay) {
					//ngx_accept_mutex_delay        ,           
					timer = ngx_accept_mutex_delay;
				}
			}
		}
	}

    	//#define ngx_process_events   ngx_event_actions.process_events
	//In epoll, ngx_event_actions = ngx_epoll_module_ctx.actions;
	//    ngx_epoll_module_ctx(  ngx_event_module_t),  actions(  ngx_event_actions_t),  process_events  
	//    epoll ngx_epoll_process_events()
	(void) ngx_process_events(cycle, timer, flags);

	delta = ngx_current_msec - delta;//  process    

	//ngx_posted_accept_events     --> accept    ,    
	//accept  ,        accept        
	//rev->handler = ngx_event_accept ngx_event_process_init    
	if (ngx_posted_accept_events) {
		ngx_event_process_posted(cycle, &ngx_posted_accept_events);
	}

	//              ,        accept ,    
	if (ngx_accept_mutex_held) {
		ngx_shmtx_unlock(&ngx_accept_mutex);
	}

	if (delta) {
		ngx_event_expire_timers();
	}

	//  accept                ,       ,         
	if (ngx_posted_events) {
		//         
		if (ngx_threaded) {
			ngx_wakeup_worker_thread(cycle);
		} else {
			ngx_event_process_posted(cycle, &ngx_posted_events);
		}
	}
}
 
ここまで言うと重要な構造ngx_module_t、ここには非常に重要な関数フックが定義されています.
struct ngx_module_s {
	//ctx_index         ,nginx         :core、event、http 
	//mail,              ,  ctx_index                

	ngx_uint_t            ctx_index;    

	//index        ,       ngx_modules[]        
	//( objs/ngx_modules.c), 0             
	ngx_uint_t            index; 

	/*...*/
	ngx_uint_t            version;

	//ctx       ,              ,             ,           ,  ctx void *
	void                 *ctx; 

	//commands        ,nginx                  ,    
	//             ,               ngx_command_t   
	//  ,nginx                      commands     ,
	//                                        
	//    (  include  ),              ,          
	//  unix                。
	ngx_command_t        *commands;

	//type       ,      ,nginx    core、event、http mail  ,type          。 
	ngx_uint_t            type; 

	//init_master、 init_module、init_process、init_thread、exit_thread、
	//exit_process、 exit_master     ,              ,
	//            master、     、       、     、
	//    、         master      ,              ,
	//          ,                         
	ngx_int_t           (*init_master)(ngx_log_t *log);
	ngx_int_t           (*init_module)(ngx_cycle_t *cycle);
	ngx_int_t           (*init_process)(ngx_cycle_t *cycle);
	ngx_int_t           (*init_thread)(ngx_cycle_t *cycle);
	void                (*exit_thread)(ngx_cycle_t *cycle);
	void                (*exit_process)(ngx_cycle_t *cycle);
	void                (*exit_master)(ngx_cycle_t *cycle);

	/*...*/

};
 
//epoll       
static ngx_int_t ngx_epoll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags)
{

	//        event_list
	events = epoll_wait(ep, event_list, (int) nevents, timer);

	if (flags & NGX_UPDATE_TIME || ngx_event_timer_alarm) {
		ngx_time_update();
	}

	//  ngx_posted_events_mutex
	//ngx_posted_events_mutex   NGX_THREAD         
	ngx_mutex_lock(ngx_posted_events_mutex);

	for (i = 0; i < events; i++) {
		//     connection
		c = event_list[i].data.ptr;
		instance = (uintptr_t) c & 1;
		c = (ngx_connection_t *) ((uintptr_t) c & (uintptr_t) ~1);
		rev = c->read;

		revents = event_list[i].events;

		if ((revents & (EPOLLERR|EPOLLHUP)) && 
		   (revents & (EPOLLIN|EPOLLOUT)) == 0) {
		/*
		* if the error events were returned without EPOLLIN or EPOLLOUT,
		* then add these flags to handle the events at least in one active handler
		*/
			revents |= EPOLLIN|EPOLLOUT;
		}

		//default rev->active is 1
		if ((revents & EPOLLIN) && rev->active) {
			if ((flags & NGX_POST_THREAD_EVENTS) && !rev->accept) {
				rev->posted_ready = 1;
			}
		} 
		else {
			rev->ready = 1;
		}
		if (flags & NGX_POST_EVENTS) {
		//       ,accept     1;accept()       (timeout),accept  0
		//         ,       queue(ngx_posted_accept_events  ngx_posted_events)  
			queue = (ngx_event_t **) (rev->accept ? &ngx_posted_accept_events : &ngx_posted_events);
			ngx_locked_post_event(rev, queue);
		} 
		else {
			rev->handler(rev);
		}

		wev = c->write;
		if ((revents & EPOLLOUT) && wev->active) {
			if (flags & NGX_POST_THREAD_EVENTS) {
			wev->posted_ready = 1;
		}
		else {
			wev->ready = 1;
		}
		if (flags & NGX_POST_EVENTS) {
			ngx_locked_post_event(wev, &ngx_posted_events);
		}
		else {
			wev->handler(wev);
		}
	}
	//  
	ngx_mutex_unlock(ngx_posted_events_mutex);
	return NGX_OK;
}
 
///Debuugトレース
次に,単一プロセスとマルチプロセスで1つのhttpリクエストを処理する例として,イベント処理の流れを解析する.私はnginxの中にあるngxを使います.log_debugX()は、イベント処理の主要関数ngx_を挿入するepoll_process_events()とngx_event_process_posted().コンパイルするときは、「--with-debug」パラメータを追加する必要があります.nginxを指定します.confの「error_log logs/debug.log debug_core|debug_event|debug_http;」です.nginxを再起動します.
単一プロセス(work_processes 1):
1.初期化すなわちngx_worker_process_Init()でngx_を2回呼び出すepoll_add_event().1回目はngx_event_process_Init()では、各リスニングポート(私の例では80ポートのみリスニング)にNGX_を追加します.READ_イベントイベント2回目はngx_add_channel_event()は、プロセス間で通行するsocketpairに追加されます.
NGX_READ_イベントイベント.
2.ngx_を呼び出し続けるepoll_process_events()関数は、リスニングされたイベントが発生するかどうかを検出します.httpリクエストが入ってくると、epollのイベントがトリガーされます.以前に各リスニングポートにhandlerがngx_であることが設定されていたためevent_accept()は、
ngx_epoll_process_events()でrev->handler(rev)を呼び出すと呼び出されます
ngx_event_accept().この関数では、accept()が呼び出されます.すなわち、要求を受信し、新しい接続を割り当て、この新しい接続を初期化し、listening socketのhandler、すなわちls->handler(c)を呼び出します.ls->handlerはhttpにいるからblock()(構成を読み込んだ後)には既に設定されています(ls->handler=ngx_http_init_connection;)ngx_が呼び出されますhttp_init_connection().この関数では、読み取りイベントが追加され、処理フックがngx_であるように設定されます.http_init_request().
3.epollが新しいイベント呼び出しをトリガーする
ngx_http_init_request()は、httpリクエスト処理の各段階を継続します.(process request line,process headers,各phaseなど)
4.最後にclientは接続を閉じました(Linuxの下のcurlを使っています).ngx_が呼び出されましたhttp_finalize_request() => ngx_http_finalize_connection() => ngx_http_set_keepalive().
ngx_http_set_keepalive()関数設定イベントの処理関数はngx_http_keepalive_handler()を呼び出し、ngx_を呼び出します.post_event()ngx_に追加posted_eventsキューにあります.そしてngx_event_process_posted()関数は、キュー内のすべてのイベントを処理して削除します.にある
ngx_http_keepalive_handler()関数でngx_を呼び出します.http_close_connection() => ngx_close_connection() => ngx_del_conn(c,NGX_CLOSE_EVENT).ngx_del_conn()すなわちngx_epoll_del_connection()は,この処理要求のconnectionをepollが傍受するイベントリストから削除する.
マルチプロセス(work_processes 2を設定しました):単一プロセスとは異なり、単一プロセスはepoll timerを-1に設定します.つまり、リスニングされたポートが要求を受信するまでイベントなしにブロックされます.マルチプロセスは異なり、各プロセスにepoll_が設定されます.wait()のtimeoutは,傍受ポートで要求を受ける権利を順番に取得しようと試み,イベントがなければ他のイベントを処理し,取得すればブロックする(
*任意のイベントが発生するまで)
1.ngx_event_process_Init()では、呼び出すだけです
ngx_add_channel_event()は、httpリスニングのポートにイベントを追加するのではなく、プロセス間通信のsocketpairにイベントを追加します(要求を同時に受け入れる複数のワークプロセスがないことを保証するために).各プロセスがfork()されると、親プロセスはngx_を呼び出します.pass_open_channel() => ngx_write_channel()=>sendmsg()は、すでに存在するすべてのプロセスに通知する(これは、ngx_channel_handler()関数を呼び出す受信者のイベントをトリガーする)
2.ngx_process_events_and_timers()では、すべてのプロセスngx_を同期するためにロックを使用します.trylock_accept_mutex()は、ngx_を得るプロセスが1つしかありません.accept_mutexというロック.このロックを取得したプロセスはngx_を呼び出します.enable_accept_events()リスニングポートのイベントを追加します.
3.ngx_epoll_process_events()でngx_が呼び出されましたlocked_post_event()は、accept queue(すなわち、ngx_posted_accept_events)に読み取りイベントを追加し、ngx_event_process_posted()内処理、すなわち呼び出し
ngx_event_accept()を選択し、読み取りイベントを追加します(後は単一プロセスと同じです).処理完了ngx_posted_accept_eventsキュー内のすべてのacceptイベントの後、ngx_accept_mutexというロックも解放され、要求を受け入れる権利を他のプロセスに譲る.
*マルチプロセスのモードでは、新しいサブプロセスが開始されるたびに、親プロセスは残りのすべてのプロセスのsocketpair channelに新しいサブプロセスのchannelをブロードキャストします.これにより、以前にリスニングポート権限(すなわち、ngx_accept_mutex)を取得したプロセスがepollイベントをトリガーし、ngx_を解放するaccept_mutexは、初期化フェーズ(その後、サブプロセス間では一般的に通信しない)で発生するが、epollにリスニングポートイベントを追加する2つ以上のプロセスは一般的に発生しない.しかし、理論的には、このような設計は、システムのバグを引き起こす可能性がある(例えば、サブプロセスに信号を送信する方法によっていくつかの特殊な機能を実現する人がいる場合、そのうちの1つのプロセスがngx_accept_mutexを放棄し、別の1つのプロセスがその後ngx_accept_mutexを再び取得する可能性がある).