Nginxのupstreamモジュールと逆エージェント(一)


Nginxは、同時接続を処理する能力に優れているため、逆プロキシサーバとして使用されることが増えています.リバースエージェントとは,Nginxをユーザアクセスの最先端に置き,ユーザからの要求を傍受し,対応するバックエンドサーバに転送して具体的な要求を処理することである.バックエンドサーバは、Squidなどのキャッシュサーバ、またはapache/Nginx/lighttpdなどの動的/静的リクエストを処理するサーバであってもよく、ここでは詳細には説明しない.本稿では,upstreamサーバにおけるNginxの設定と「proxy_pass」の機能をコードレベルで分析し,実際の生産環境での問題点について議論する.
upstreamモジュールには、主に2つのコマンド、「upstream」と「server」があります.プロファイルには、次のような構成があります.
http{
	#...
	upstream backend_name{
		server xx.xx.xx.xx:xx weight=2 max_fails=3;
		server www.xxx.com weight=1;
		server unix://xxx/xxx;
		#...
	}
	#...
}
 
ここで「http」コマンドが解析されるとngx_が呼び出されますhttp_block()関数ですが、この関数でプロファイルが再解析され、「upstream」コマンドが見つかり、set()、すなわちngx_が呼び出されます.http_upstream()関数.この関数は次のとおりです.
ngx_http_upstream(...)
{
	//  upstream       , upstream     
	//...
	//         ngx_http_upstream_srv_conf_t  ,   umcf->upstreams    
	//  :          ,   "proxy_pass"    umcf->upstreams  match upstream    (              ><)
	uscf = ngx_http_upstream_add(...);
	ctx = ngx_pcalloc(cf->pool, sizeof(ngx_http_conf_ctx_t));
	//  main_conf
	http_ctx = cf->ctx;
	ctx->main_conf = http_ctx->main_conf;
	//    srv_conf
	ctx->srv_conf = ngx_pcalloc(cf->pool, sizeof(void *) * ngx_http_max_module);
	// ngx_http_upstream_add()   ngx_http_upstream_srv_conf_t  srv_conf
	ctx->srv_conf[ngx_http_upstream_module.ctx_index] = uscf;
	//  uscf     srv_conf,         srv_conf,     srv_conf
	uscf->srv_conf = ctx->srv_conf;
	//  loc_conf
	ctx->loc_conf = ngx_pcalloc(cf->pool, sizeof(void *) * ngx_http_max_module);
	//for each NGX_HTTP_MODULE
	for (;;){
		//if this module has create_srv_conf() ...
		mconf = module->create_srv_conf(cf);
		ctx->srv_conf[ngx_modules[m]->ctx_index] = mconf;
		//if this module has create_loc_conf() ...
		mconf = module->create_loc_conf(cf);
		ctx->loc_conf[ngx_modules[m]->ctx_index] = mconf;	
	}
	cf->ctx = ctx;
	cf->cmd_type = NGX_HTTP_UPS_CONF;
	//    server   
	rv = ngx_conf_parse(cf, NULL);
}
 
「server」の構成行の解析を続行し、各行に1回呼び出します.
ngx_http_upstream_server(...)
{
	ngx_http_upstream_server_t  *us;
	us = ngx_array_push(uscf->servers);
	//       , url
	u.url = value[1];
	u.default_port = 80;
	ngx_parse_url(cf->pool, &u);
	//  "server"       
	//...
	//  us
	us->addrs = u.addrs;
	us->naddrs = u.naddrs;
	us->weight = weight;
	us->max_fails = max_fails;
	us->fail_timeout = fail_timeout;
}
 
ngx_http_core_moduleのserver{...}blockの/location{...}blockでは、「proxy_pass」コマンドがあれば、set()、すなわちngx_を呼び出します.http_proxy_pass():
ngx_http_proxy_pass(...)
{
	ngx_http_proxy_loc_conf_t *plcf = conf;
	//     location,    location   "proxy_pass"  
	clcf = ngx_http_conf_get_module_loc_conf(cf, ngx_http_core_module);
	//  loc handler,  clcf->handler  ngx_http_update_location_config()    r->content_handler,   NGX_HTTP_CONTENT_PHASE      handler, ngx_http_proxy_handler。
	clcf->handler = ngx_http_proxy_handler;
	
	//  "proxy_pass"     ( url)     
	//...
	//  http/https  port add(  )
	//...

	u.url.len = url->len - add;//  url   ,  http://(https://)
	u.url.data = url->data + add;//  url,  http://(https://),     "http://backend1",    "backend1"
	u.default_port = port;//  port
	u.uri_part = 1;
	u.no_resolve = 1;//  resolve  url   

	//        upstream          match  
	plcf->upstream.upstream = ngx_http_upstream_add(cf, &u, 0);
	
	//  plcf   
	//...
	//  location   
	plcf->location = clcf->name;
	//...
}
 
では、特定のlocationにアクセスするリクエストがある場合(このlocationがproxy_passコマンドを構成していると仮定)、他のリクエストと同様に各phaseのcheckerとhandlerが呼び出され、NGX_HTTP_CONTENT_PHASEのchecker、すなわちngx_http_core_content_phase()の場合、r->content_が呼び出されますhandler(r)、すなわち
ngx_http_proxy_handler
.
ngx_http_proxy_handler(r)
{
	ngx_http_upstream_t        *u;

	//      ngx_http_upstream_t,   r->upstream
	//...
	ngx_http_upstream_create(r);
	//  r->ctx[ngx_http_proxy_module.ctx_index] = ctx
	ctx = ngx_pcalloc(r->pool, sizeof(ngx_http_proxy_ctx_t));
	ngx_http_set_ctx(r, ctx, ngx_http_proxy_module);
	//  ngx_http_proxy_loc_conf_t
	plcf = ngx_http_get_module_loc_conf(r, ngx_http_proxy_module);
	
	//...
	
	//  upstream server   ,   、   ip、port 
	u->conf = &plcf->upstream;
	//               (       )
	u->create_request = ngx_http_proxy_create_request;
	//             ( create_request        )   
	u->reinit_request = ngx_http_proxy_reinit_request;
	//             bit,                  
	u->process_header = ngx_http_proxy_process_status_line;
	//              
	u->abort_request = ngx_http_proxy_abort_request;
	// Nginx                 
	u->finalize_request = ngx_http_proxy_finalize_request;
	
	//...
	u->pipe = ngx_pcalloc(r->pool, sizeof(ngx_event_pipe_t));
	u->pipe->input_filter = ngx_event_pipe_copy_input_filter;

	ngx_http_read_client_request_body(r, ngx_http_upstream_init);
	//  special response
	return NGX_DONE;//-4
}
 
ngx_http_read_client_request_body()でpost_が呼び出されますhandler()すなわちngx_http_upstream_init():
ngx_http_upstream_init(r)
{
	//...
	//   edge trigger    (epoll/kqueue),    NGX_WRITE_EVENT
	//                  
	ngx_add_event(c->write, NGX_WRITE_EVENT, NGX_CLEAR_EVENT);
	ngx_http_upstream_init_request(r);
}
 
ngx_http_upstream_init_request(r)
{
	ngx_http_upstream_t  *u;
	u = r->upstream;
	//  r->read_event_handler r->write_event_handler       
	r->read_event_handler = ngx_http_upstream_rd_check_broken_connection;
	r->write_event_handler = ngx_http_upstream_wr_check_broken_connection;
	//     request  (    ,headers)  r->upstream      
	// r->upstream->url     url
	//   (     ,headers)  r->upstream->request_bufs  buf chain
	u->create_request(r);

	clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);
	//   u->output(ngx_output_chain_ctx_s  )
	u->output.alignment = clcf->directio_alignment;
	u->output.pool = r->pool;
	u->output.bufs.num = 1;
	u->output.bufs.size = clcf->client_body_buffer_size;
	u->output.output_filter = ngx_chain_writer;
	u->output.filter_ctx = &u->writer;
	u->writer.pool = r->pool;

	//  state(ngx_http_upstream_state_t  )
	//...

	//    cleanup    
	ngx_http_cleanup_add(r,0);
	cln->handler = ngx_http_upstream_cleanup;
	cln->data = r;
	u->cleanup = &cln->handler;

	//  ngx_http_upstream_srv_conf_t
	uscf = u->conf->upstream;
	ngx_http_upstream_connect(r,u);
}
 
ngx_http_upstream_connect(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
	u->state = ngx_array_push(r->upstream_states);
	//    ,  u->state
	//...

	//    socket,  ngx_get_connection()       connection    
	//   connection       , bind()  connect() peer
	//      ,    ,     EINPROGRESS,       
	rc = ngx_event_connect_peer(&u->peer);
	
	c = u->peer.connection;
	c->data = r;
	c->write->handler = ngx_http_upstream_handler;
	c->read->handler = ngx_http_upstream_handler;
	u->write_event_handler = ngx_http_upstream_send_request_handler;
	u->read_event_handler = ngx_http_upstream_process_header;
	c->sendfile &= r->connection->sendfile;
	u->output.sendfile = c->sendfile;
	//init or reinit the ngx_output_chain() and ngx_chain_writer() contexts
	//...

	//rc == NGX_AGAIN,     
	ngx_add_timer(c->write, u->conf->connect_timeout);
	
	//rc == NGX_OK,    
	ngx_http_upstream_send_request();
}
 
戻ってからngx_に入りましたhttp_finalize_request() --> ngx_http_finalize_connection() --> ngx_http_close_request()ですが、クライアントの接続は閉じません.その後、EPOLLOUTイベント(ユーザーとの接続で発生)をトリガーし、ngx_に進みます.http_request_handler().このイベントはngx_http_upstream_Init()に追加されました.このイベント処理関数はngx_http_process_request()に設定します.この関数は簡単です.
ngx_http_request_handler(ngx_event_t *ev)
{
	c = ev->data;
	r = c->data;
	//if...
	r->write_event_handler(r);
	//else...
	r->read_event_handler(r);
	//subrequest...
	ngx_http_run_posted_requests(c);
}
 
ngx_http_upstream_init_request()にr->read_が設定されていますevent_handlerとr->write_event_handler、ngx_を指しますhttp_upstream_check_broken_接続すると、上の関数で、この関数が呼び出されます.主に、ユーザーとの接続が切断されているかどうかを確認します.
ngx_http_upstream_check_broken_connection(r,ev)
{
	c = r->connection;
	u = r->upstream;

	n = recv(c->fd, buf, 1, MSG_PEEK);
	//...
	//  err   NGX_EAGAIN(11)
	err = ngx_socket_errno;
	//...
}
 
処理が完了すると、もう一つのイベントも同時にトリガーされ(epoll処理イベントループで)、upstreamと接続された書き込みイベント(upstreamサーバに送信されます).c->write->handlerすなわちngx_を呼び出すhttp_upstream_handler().これは前にngx_http_upstream_connect()に設定されている:
ngx_http_upstream_handler(ev)
{
	c = ev->data;
	r = c->data;
	u = r->upstream;
	c = r->connection;
	//if...
	u->write_event_handler(r, u);
	//else...
	u->read_event_handler(r, u);
	//subrequest...
	ngx_http_run_posted_requests(c);
}
 
から
ngx_http_upstream_connect()は
u->write_event_handler = ngx_http_upstream_send_request_handler、この関数は主にngx_を呼び出します.http_upstream_send_request()関数:
ngx_http_upstream_send_request(r,u)
{
	c = u->peer.connection;
	//       
	rc = ngx_output_chain(&u->output, u->request_sent ? NULL : u->request_bufs);
	u->write_event_handler = ngx_http_upstream_dummy_handler;
}
 
バックエンド(upstream)サーバがリクエストを処理すると、イベントがトリガーされます.再呼び出し
ngx_http_upstream_handler()、今回の呼び出し
u->read_event_handler、すなわち
ngx_http_upstream_process_header():
ngx_http_upstream_process_header(r,u)
{
	//...

	for ( ;; ) {
		n = c->recv(c, u->buffer.last, u->buffer.end - u->buffer.last);
		//u->process_header = ngx_http_proxy_process_status_line
		//    ngx_http_proxy_process_header(),     headers
		rc = u->process_header(r);
	}
	ngx_http_upstream_process_headers();
	//  subrequest_in_memory == 0
	ngx_http_upstream_send_response(r, u);

	//else...
	//...
}
 
ngx_http_upstream_send_response(r,u)
{
	//  ngx_http_top_header_filter ngx_http_header_filter()
	//ngx_http_header_filter()    ngx_http_write_filter()
	//ngx_http_write_filter()    chain,        
	rc = ngx_http_send_header(r);

	//...

	u->read_event_handler = ngx_http_upstream_process_upstream;
	r->write_event_handler = ngx_http_upstream_process_downstream;
	
	ngx_http_upstream_process_upstream(r, u);
}
 
ngx_http_upstream_process_upstream(r,u)
{
	c = u->peer.connection;
	//    timeout
	//
	ngx_event_pipe(u->pipe, 0);
}