NFingx ustream(一)全体の流れ分析
51670 ワード
NFIx上流サーバへのアクセスの流れは、大きく分けて以下の段階に分けられます.アップストリームを起動し、上流サーバに接続し、上流へ要求を送信し、上流応答(包頭/包体)を受信し、要求を終了します.本編では主にコードフローの観点から、アップストリーム全体のデータ処理の流れを整理します.次にアップストリーム関連の2つの重要なデータ構造を見てみます.http_アップストリームtとngx_http_アップストリームconft:関連データ構造
ここをクリックして折り畳みまたは開く
ここをクリックして折り畳みまたは開く
ここをクリックして折り畳みまたは開く
ここをクリックして折り畳みまたは開く
ここをクリックして折り畳みまたは開く
ここをクリックして折り畳みまたは開く
typedef struct ngx_http_upstream_s ngx_http_upstream_t;
struct ngx_http_upstream_s {
ngx_http_upstream_handler_pt read_event_handler; //
ngx_http_upstream_handler_pt write_event_handler; //
ngx_peer_connection_t peer; // ,
ngx_event_pipe_t *pipe; // , pipe , http upstream pipe
ngx_chain_t *request_bufs; // ngx_buf_t , ,
// create_request request_buf
ngx_output_chain_ctx_t output; // ,
ngx_chain_writer_ctx_t writer; // ,
ngx_http_upstream_conf_t *conf; // upstream
#if (NGX_HTTP_CACHE)
ngx_array_t *caches; // ,
#endif
ngx_http_upstream_headers_in_t headers_in; // ,process_header http , headers_in
ngx_http_upstream_resolved_t *resolved; // ,
ngx_buf_t from_client; // ToDo....
ngx_buf_t buffer; // , buffering 0 ,
off_t length; //
ngx_chain_t *out_bufs; // ,
ngx_chain_t *busy_bufs; // buffering 0 ,
ngx_chain_t *free_bufs; // buffering 0 , out_bufs ngx_buf_t
ngx_int_t (*input_filter_init)(void *data); // , data , input_filter_ctx
ngx_int_t (*input_filter)(void *data, ssize_t bytes)// ,bytes ,data
void *input_filter_ctx; // http
#if (NGX_HTTP_CACHE)
ngx_int_t (*create_key)(ngx_http_request_t *r); // cache ,
#endif
ngx_int_t (*create_request)(ngx_http_request_t *r); //
ngx_int_t (*reinit_request)(ngx_http_request_t *r); // , ,
ngx_int_t (*process_header)(ngx_http_request_t *r); // ,NGX_AGAIN ,NGX_OK
void (*abort_request)(ngx_http_request_t *r); //
void (*finalize_request)(ngx_http_request_t *r, // ,
ngx_int_t rc);
ngx_int_t (*rewrite_redirect)(ngx_http_request_t *r,// Location Refresh ,process_header http
ngx_table_elt_t *h, size_t prefix);
ngx_int_t (*rewrite_cookie)(ngx_http_request_t *r, // , Set-Cookie , http
ngx_table_elt_t *h);
ngx_msec_t timeout; //
ngx_http_upstream_state_t *state; // 、
ngx_str_t method; // ,
ngx_str_t schema; //
ngx_str_t uri; //
ngx_http_cleanup_pt *cleanup; // , ,
unsigned store:1; //
unsigned cacheable:1; //
unsigned accel:1; //
unsigned ssl:1; // SSL
unsigned buffering:1; // ,
unsigned keepalive:1; // keepalive ?
unsigned upgrade:1; // upgrade header
unsigned request_sent:1; //
unsigned header_sent:1; // 1 ,
}
ngx_http_アップストリームconft:アップストリームの運転方式を指定しましたので、アップストリームを起動する前に設定しなければなりません.ここをクリックして折り畳みまたは開く
typedef struct {
ngx_http_upstream_srv_conf_t *upstream; // resolved ,
ngx_msec_t connect_timeout; // tcp ,
ngx_msec_t send_timeout; // ,
ngx_msec_t read_timeout; // ,
ngx_msec_t timeout; //
ngx_msec_t next_upstream_timeout; //
size_t send_lowat; // , TCP SO_SNOLOWAT
size_t buffer_size; // , buffering 0 , buffer ,
size_t limit_rate; //
size_t busy_buffers_size; // buffering 1, , ngx_event_pipe_t busy_size
size_t max_temp_file_size; // , ngx_event_pipe_t temp_file
size_t temp_file_write_size; // ,
......
ngx_bufs_t bufs; //
ngx_uint_t ignore_headers; // headers
ngx_uint_t next_upstream; // , ,
ngx_uint_t store_access; //
ngx_uint_t next_upstream_tries; //
ngx_flag_t buffering; // 1 , , 0
......
ngx_flag_t ignore_client_abort; // 1 , nginx , ,
ngx_flag_t intercept_errors; // ngx_http_upstream_intercept_errors
ngx_flag_t cyclic_temp_file; // 1 ,
......
ngx_path_t *temp_path; // buffering 1 ,
ngx_hash_t hide_headers_hash; // , hide_headers pass_headers http
ngx_array_t *hide_headers; // , ,
ngx_array_t *pass_headers; // upstream , ,
ngx_http_upstream_local_t *local; // ,
ngx_array_t *store_lengths; // ,
ngx_array_t *store_values; // ,
......
signed store:2; // ngx_http_upstream_t store
unsigned intercept_404:1; // 1, 404 , error_page
unsigned change_buffering:1; // 1 , , ,
......
ngx_str_t module; // upstream ,
} ngx_http_upstream_conf_t
ubstreamを起動して、要請を受けたらhttpの代理モジュールはngx_です.http_proxymodule,そのNGXHTTP_CONTENT_PHASE段階の処理関数はngx_です.http_proxyハンドルここをクリックして折り畳みまたは開く
static ngx_int_t
ngx_http_proxy_handler(ngx_http_request_t *r)
{
// ngx_http_upstream_t , r->upstream
if (ngx_http_upstream_create(r) != NGX_OK) {
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
.....
plcf = ngx_http_get_module_loc_conf(r, ngx_http_proxy_module);
.....
u = r->upstream;
.....
// upstream conf ,
u->conf = &plcf->upstream;
//
u->create_request = ngx_http_proxy_create_request;
u->reinit_request = ngx_http_proxy_reinit_request;
u->process_header = ngx_http_proxy_process_status_line;
u->abort_request = ngx_http_proxy_abort_request;
u->finalize_request = ngx_http_proxy_finalize_request;
......
u->buffering = plcf->upstream.buffering;
.....
// ngx_http_upstream_init
rc = ngx_http_read_client_request_body(r, ngx_http_upstream_init);
.....
return NGX_DONE;
}
まずアップストリームの構造を作成して設定して、ngx_を設定します.http_アップストリームconft構造体を配置し、アップストリーム->confを与える.ngx_http_アップストリームinit関数はngx_によります.http_アップストリームconft設定の情報は、ubstreamを初期化し、上流サーバとの接続を開始することにより、ubstream全体の処理フローを展開する.ここをクリックして折り畳みまたは開く
void ngx_http_upstream_init(ngx_http_request_t *r)
{
ngx_connection_t *c;
//
c = r->connection;
......
// upstream , ,
if (c->read->timer_set) {
ngx_del_timer(c->read);
}
......
ngx_http_upstream_init_request(r);
}
ドンクスを見続けますhttp_アップストリームinit_request関数はここをクリックして折り畳みまたは開くことができます.
static void ngx_http_upstream_init_request(ngx_http_request_t *r)
{
u = r->upstream;
u->store = u->conf->store;
......
// Nginx TCP , ,ignore ,
if (!u->store && !r->post_action && !u->conf->ignore_client_abort) {
r->read_event_handler = ngx_http_upstream_rd_check_broken_connection;
r->write_event_handler = ngx_http_upstream_wr_check_broken_connection;
}
......
// http create_request , ngx_http_proxy_create_request ,
if (u->create_request(r) != NGX_OK) {
ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
......
// main cleanup
cln = ngx_http_cleanup_add(r, 0);
// handler ngx_http_upstream_cleanup
cln->handler = ngx_http_upstream_cleanup;
cln->data = r;
u->cleanup = &cln->handler;
......
// ngx_http_upstream_connect
ngx_http_upstream_connect(r, u);
}
アップストリームサーバとの接続体制を確立し、アップストリームサーバとの間にtcpを介して接続を確立し、3回の握手の間にプロセスをブロックしないようにするために、NFIxはブロックなしのソケットを採用して、上流サーバと接続します.ngx_http_アップストリームconnectは建設を開始し、すぐに成功に戻らない場合、epollでこのソケットを監視する必要があり、書き込み可能なイベントが発生した場合、接続が確立されたと説明します.ここをクリックして折り畳みまたは開く
static void ngx_http_upstream_connect(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
// .....
rc = ngx_event_connect_peer(&u->peer);
....
ここをクリックして折り畳みまたは開く
ngx_int_t ngx_event_connect_peer(ngx_peer_connection_t *pc)
{
// tcp socket
s = ngx_socket(pc->sockaddr->sa_family, SOCK_STREAM, 0);
......
// ngx_connection_t , ngx_cycle_t free_connections
c = ngx_get_connection(s, pc->log);
......
//
if (ngx_nonblocking(s) == -1) {
......
//
if (pc->local) {
if (bind(s, pc->local->sockaddr, pc->local->socklen) == -1) {
......
//
c->recv = ngx_recv;
c->send = ngx_send;
c->recv_chain = ngx_recv_chain;
c->send_chain = ngx_send_chain;
// sendfile
c->sendfile = 1;
......
rev = c->read;
wev = c->write;
......
pc->connection = c;
// ngx_event_actions.add_conn tcp 、 , epoll
if (ngx_add_conn) {
if (ngx_add_conn(c) == NGX_ERROR) {
goto failed;
}
}
// , ,
rc = connect(s, pc->sockaddr, pc->socklen);
......
ngx_に戻るhttp_アップストリームconnectを続けて分析します.ここをクリックして折り畳みますか?開けます.
static void ngx_http_upstream_connect(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
......
// ,
rc = ngx_event_connect_peer(&u->peer);
......
c = u->peer.connection;
c->data = r;
// connection , ngx_http_upstream_handler
c->write->handler = ngx_http_upstream_handler;
c->read->handler = ngx_http_upstream_handler;
// upstream write_event_handler read_event_handler, ngx_upstream_handler
// ngx_http_upstream_send_request_handler
u->write_event_handler = ngx_http_upstream_send_request_handler;
// ngx_http_upstream_process_header
u->read_event_handler = ngx_http_upstream_process_header;
......
if (rc == NGX_AGAIN) {
// , epoll , , ngx_http_upstream_conf_t connect_timeout
ngx_add_timer(c->write, u->conf->connect_timeout);
return;
}
......
// , , : ,
ngx_http_upstream_send_request(r, u);
}
まず簡単にconnectionの読みと書き戻しの関数を見ます.http_アップストリームハンドルをクリックして折り畳みまたは開く
static void
ngx_http_upstream_handler(ngx_event_t *ev)
{
......
// data ngx_connection_t , nginx
c = ev->data;
// data ngx_http_request_t
r = c->data;
// upstream upstream ngx_http_upstream_t
u = r->upstream;
// ngx_http_request_t connection nginx
c = r->connection;
......
if (ev->write) {
// nginx tcp ,
u->write_event_handler(r, u);
} else {
// nginx tcp ,
u->read_event_handler(r, u);
}
// nginx_http_request_handler , post
ngx_http_run_posted_requests(c);
}
上流サーバに要求を送信する前に、ngx_を紹介しています.http_アップストリームconnect関数の時、私達はngx_を見ました.http_アップストリームtの中のwrite_イベントハンドルはngx_に設定されています.http_アップストリームsend_request_ハンドル、そしてngx_http_アップストリームconnectの最後に直接ngx_を呼び出しました.http_アップストリームsend_requestは要求を送ります.まず両者の違いを見てみます.ここをクリックして折り畳みます.
static void ngx_http_upstream_send_request_handler(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
ngx_connection_t *c;
// ngx_connection_t
c = u->peer.connection;
// timeout 1 , http
if (c->write->timedout) {
// next ,next : upstream , upstream
ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_TIMEOUT);
return;
}
......
// header_sent 1 , ,
if (u->header_sent) {
// , , write ( )
u->write_event_handler = ngx_http_upstream_dummy_handler;
// epoll
(void) ngx_handle_write_event(c->write, 0);
return;
}
// http
ngx_http_upstream_send_request(r, u);
}
上の分析を通して、今は両者の違いが分かりやすくなりました.http_アップストリームsend_request_ハンドルは要求の状態を検出していますが、実際の送信関数はngx_です.http_アップストリームsend_requestは、この関数を続けて見ます.ここをクリックして折り畳みまたは開く
static void ngx_http_upstream_send_request(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
......
// u->request_bufs , , request_bufs
rc = ngx_output_chain(&u->output, u->request_sent ? NULL : u->request_bufs);
// , ngx_output_chain, , request_bufs, NULL
u->request_sent = 1;
......
// , , ngx_output_chain
if (c->write->timer_set) {
ngx_del_timer(c->write);
}
// ngx_output_chain NGX_AGAIN , ,
if (rc == NGX_AGAIN) {
ngx_add_timer(c->write, u->conf->send_timeout);
// epoll
if (ngx_handle_write_event(c->write, u->conf->send_lowat) != NGX_OK) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
// ngx_http_upstream_send_request , epoll
return;
}
/* rc == NGX_OK */
// ngx_output_chain NGX_OK , ,
......
u->write_event_handler = ngx_http_upstream_dummy_handler;
// epoll
if (ngx_handle_write_event(c->write, 0) != NGX_OK) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
// , ,
ngx_add_timer(c->read, u->conf->read_timeout);
// ready , , process_header
if (c->read->ready) {
ngx_http_upstream_process_header(r, u);
return;
}
}
上流サーバの応答Nginxを受信するuplostreamメカニズムは、応答パケットを転送しない、応答を転送しない場合は、下流のネットワーク速度で優先し、応答を転送する場合は、上流のネットワーク速度で優先します.ngxをするhttp_request_t構造体のsubrequest_同前memoryフラグビットが1の場合、転送応答はありません.subrequest_同前memoryが0の場合、応答を転送します.そしてngx_http_アップストリームconft構成構造におけるバfferingが0の場合は、下流のネットワーク速度を優先し、固定サイズのメモリをキャッシュとして使用する.bufferingが1の場合、上流のネットワーク速度を優先して、より多くのメモリ、ハードディスクファイルをキャッシュとして採用する.以下では、応答ヘッダを受信、解析するためのngx_を見ます.http_アップストリームprocess.headerメソッドは、(ここで)折り畳みまたは開くをクリックします.
static void
ngx_http_upstream_process_header(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
......
//
c = u->peer.connection;
......
// , , ngx_http_upstream_next
if (c->read->timedout) {
ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_TIMEOUT);
return;
}
// request_sent 1 ; 0 , , ,
// ngx_http_upstream_next ,
if (!u->request_sent && ngx_http_upstream_test_connect(c) != NGX_OK) {
ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);
return;
}
// buffer, start NULL , , buffer_size
if (u->buffer.start == NULL) {
u->buffer.start = ngx_palloc(r->pool, u->conf->buffer_size);
if (u->buffer.start == NULL) {
ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
// ,
.......
}
for ( ;; ) {
// buffer , buffer_size,
n = c->recv(c, u->buffer.last, u->buffer.end - u->buffer.last);
// NGX_AGAIN , epoll ,
if (n == NGX_AGAIN) {
if (ngx_handle_read_event(c->read, 0) != NGX_OK) {
ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
return;
}
// , next ,
if (n == NGX_ERROR || n == 0) {
ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);
return;
}
// n 0 , , last n ,last start , buffer
u->buffer.last += n;
//
rc = u->process_header(r);
// process_header , NGX_AGAIN , , buffer_size
if (rc == NGX_AGAIN) {
// buffer , next
if (u->buffer.last == u->buffer.end) {
ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_INVALID_HEADER);
......
}
// process_header , , , next
if (rc == NGX_HTTP_UPSTREAM_INVALID_HEADER) {
ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_INVALID_HEADER);
return;
}
// process_header ERROR ,
if (rc == NGX_ERROR) {
ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
// , process_header ,
/* rc == NGX_OK */
.......
// , , ngx_http_request_t headers_out
// ngx_http_send_header , headers_out
if (ngx_http_upstream_process_headers(r, u) != NGX_OK) {
return;
}
// subrequest_in_memory 0 , ; 1 ,
if (!r->subrequest_in_memory) {
//
ngx_http_upstream_send_response(r, u);
return;
}
// , subrequest_in_memory 1
/* subrequest content in memory */
// input_filter NULL,input_filter , ,
if (u->input_filter == NULL) {
u->input_filter_init = ngx_http_upstream_non_buffered_filter_init;
u->input_filter = ngx_http_upstream_non_buffered_filter;
u->input_filter_ctx = r;
}
// init , init ,
if (u->input_filter_init(u->input_filter_ctx) == NGX_ERROR) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}
// pos last
n = u->buffer.last - u->buffer.pos;
// process_header , , ,
if (n) {
u->buffer.last = u->buffer.pos;
u->state->response_length += n;
// input_filter
if (u->input_filter(u->input_filter_ctx, n) == NGX_ERROR) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}
}
......
//
u->read_event_handler = ngx_http_upstream_process_body_in_memory;
//
ngx_http_upstream_process_body_in_memory(r, u);
}
以下の分析を続けます.ubrequest_はubrequestではなく、直接に応答を転送する時の具体的な処理の流れです.memoryが1のシーンである場合、この要求はサブ要求に属する.上の分析で言及したデフォルトのinput_を見てみます.filterの処理方法は、上の分析では、ヘッダを読み込むと同時にパッケージ情報を読み、input_を呼び出します.filter方法の処理:ここをクリックして折り畳みまたは開く
static ngx_int_t ngx_http_upstream_non_buffered_filter(void *data, ssize_t bytes)
{
// data ngx_http_request_t , input_filter , input_filter_ctx , ngx_http_request_t
ngx_http_request_t *r = data;
......
u = r->upstream;
// out_bufs ll ->next
for (cl = u->out_bufs, ll = &u->out_bufs; cl; cl = cl->next) {
ll = &cl->next;
}
//
cl = ngx_chain_get_free_buf(r->pool, &u->free_bufs);
if (cl == NULL) {
return NGX_ERROR;
}
// out_bufs
*ll = cl;
......
// buffer
b = &u->buffer;
// b->last , ,cl->buf->pos , b->last cl->buf->last
cl->buf->pos = b->last;
b->last += bytes;
cl->buf->last = b->last;
cl->buf->tag = u->output.tag;
// ,
if (u->length == -1) {
return NGX_OK;
}
//
u->length -= bytes;
return NGX_OK;
}
引き続き下に分析して、process_header呼び出しinput_filterがパッケージを処理した後、最後に呼び出した関数はngx_http_アップストリームprocess.body_同前memoryは、実際に上流サーバのパッケージ内容を受信します.ここをクリックして折り畳みまたは開く
static void ngx_http_upstream_process_body_in_memory(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
......
//
c = u->peer.connection;
// , , ,
rev = c->read;
if (rev->timedout) {
ngx_connection_error(c, NGX_ETIMEDOUT, "upstream timed out");
ngx_http_upstream_finalize_request(r, u, NGX_HTTP_GATEWAY_TIME_OUT);
return;
}
// buffer
b = &u->buffer;
for ( ;; ) {
//
size = b->end - b->last;
......
// , recv
n = c->recv(c, b->last, size);
// NGX_AGAIN
if (n == NGX_AGAIN) {
break;
}
// , ,
if (n == 0 || n == NGX_ERROR) {
ngx_http_upstream_finalize_request(r, u, n);
return;
}
//
u->state->response_length += n;
//
if (u->input_filter(u->input_filter_ctx, n) == NGX_ERROR) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}
if (!rev->ready) {
break;
}
}
// ,
if (u->length == 0) {
ngx_http_upstream_finalize_request(r, u, 0);
return;
}
// Epoll
if (ngx_handle_read_event(rev, 0) != NGX_OK) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}
// , read_timeout,
if (rev->active) {
ngx_add_timer(rev, u->conf->read_timeout);
} else if (rev->timer_set) {
ngx_del_timer(rev);
}
}
上の流れは、応答ヘッダを読み取るバッファerの空間が足りないため、処理に問題があるという問題が分かりやすいです.使う時のポイントはまだInput_にあります.filterメソッドにおけるbufferの管理.転送応答を行わないプロセスを分析した後、転送応答の2つの実施形態を引き続き見て、下流ネットワーク速度優先と上流ネットワーク速度優先の実現.上流ネットワークの速度を優先する方式のため、より複雑な実現が可能となります.下流ネットワークの速度を優先する方式、すなわち固定メモリサイズを採用し、応答バッファとして使用します.コードも不要なロジックを削除します.ダウンストリームのネットワーク速度を優先的にクリックします.折り畳みまたは開くことができます.
static void
ngx_http_upstream_send_response(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
......
// , process_header headers_in , upstream_process_headers headers_in
// headers_out ,ngx_http_send_header headers_out http
rc = ngx_http_send_header(r);
......
//
u->header_sent = 1;
......
// , , , ,
if (r->request_body && r->request_body->temp_file) {
ngx_pool_run_cleanup_file(r->pool, r->request_body->temp_file->file.fd);
r->request_body->temp_file->file.fd = NGX_INVALID_FILE;
}
......
// buffering 1 , 0
if (!u->buffering) {
// input_filter, input_filter
if (u->input_filter == NULL) {
u->input_filter_init = ngx_http_upstream_non_buffered_filter_init;
u->input_filter = ngx_http_upstream_non_buffered_filter;
u->input_filter_ctx = r;
}
//
u->read_event_handler = ngx_http_upstream_process_non_buffered_upstream;
//
r->write_event_handler = ngx_http_upstream_process_non_buffered_downstream;
r->limit_rate = 0;
// input_filter ,
if (u->input_filter_init(u->input_filter_ctx) == NGX_ERROR) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}
......
// , , , ,
n = u->buffer.last - u->buffer.pos;
if (n) {
// last , response_length, input_filter
u->buffer.last = u->buffer.pos;
u->state->response_length += n;
if (u->input_filter(u->input_filter_ctx, n) == NGX_ERROR) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}
// downstream
ngx_http_upstream_process_non_buffered_downstream(r);
} else {
// buff, pos last
u->buffer.pos = u->buffer.start;
u->buffer.last = u->buffer.start;
// Todo....
if (ngx_http_send_special(r, NGX_HTTP_FLUSH) == NGX_ERROR) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}
// , downstream
if (u->peer.connection->read->ready || u->length == 0) {
ngx_http_upstream_process_non_buffered_upstream(r, u);
}
}
// Nginx
return;
}
......
}
ngx_http_アップストリームprocess.non_buffered_downstream関数は、上流サーバの応答を処理するためのイベントのクリック(ここ)を折りたたみまたは開くために使用されます.
static void ngx_http_upstream_process_non_buffered_downstream(ngx_http_request_t *r)
{
// ,
c = r->connection;
u = r->upstream;
wev = c->write;
......
// , ,
if (wev->timedout) {
c->timedout = 1;
ngx_connection_error(c, NGX_ETIMEDOUT, "client timed out");
ngx_http_upstream_finalize_request(r, u, NGX_HTTP_REQUEST_TIME_OUT);
return;
}
// non_buffered , , , 1 , 0
ngx_http_upstream_process_non_buffered_request(r, 1);
}
続いて、グクスクスクスクスを分析します.http_アップストリームprocess.non_buffered_requestをクリックして折り畳みまたは開く
static void
ngx_http_upstream_process_non_buffered_request(ngx_http_request_t *r, ngx_uint_t do_write)
{
// , downstream upstream
u = r->upstream;
downstream = r->connection;
upstream = u->peer.connection;
b = &u->buffer;
// ,do_write , u->length , 0
do_write = do_write || u->length == 0;
for ( ;; ) {
//
if (do_write) {
// out_bufs , busy_bufs out_bufs out_bufs, out_bufs
if (u->out_bufs || u->busy_bufs) {
// out_bufs ,busy_bufs out_bufs , out_bufs
rc = ngx_http_output_filter(r, u->out_bufs);
// out_bufs buf, buf busy_buf , out_bufs
ngx_chain_update_chains(r->pool, &u->free_bufs, &u->busy_bufs, &u->out_bufs, u->output.tag);
}
// busy_bufs ,
if (u->busy_bufs == NULL) {
......
// pos last
b->pos = b->start;
b->last = b->start;
}
}
// buffer
size = b->end - b->last;
//
if (size && upstream->read->ready) {
n = upstream->recv(upstream, b->last, size);
// NGX_AGAIN
if (n == NGX_AGAIN) {
break;
}
// n > 0 n
if (n > 0) {
// response_length
u->state->response_length += n;
// input_filter
if (u->input_filter(u->input_filter_ctx, n) == NGX_ERROR) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}
}
// do_write , ,
do_write = 1;
continue;
}
break;
}
......
if (downstream->data == r) {
// epoll
if (ngx_handle_write_event(downstream->write, clcf->send_lowat) != NGX_OK)
......
}
// , , send_timeout
if (downstream->write->active && !downstream->write->ready) {
ngx_add_timer(downstream->write, clcf->send_timeout);
}
......
// epoll
if (ngx_handle_read_event(upstream->read, 0) != NGX_OK) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}
// , , read_timeout
if (upstream->read->active && !upstream->read->ready) {
ngx_add_timer(upstream->read, u->conf->read_timeout);
}
......
}
上流ネットの速度を優先して上流ネットの速度を優先的に実現するのは複雑です.http_proxymoduleは上流ネットワーク速度優先の実現方式を採用しました.ngxをするhttp_アップストリームconftのBufferingを1に設定すると、上流ネットワークを優先する方式が必要であると説明します.この時はngx_が必要ですイベントpipe_t構造は、メモリ複製の問題を解決するために、上下の間に転送された応答パッケージを維持する.ここをクリックして折り畳みまたは開く
typedef struct ngx_event_pipe_s ngx_event_pipe_t;
struct ngx_event_pipe_s {
ngx_connection_t *upstream; //
ngx_connection_t *downstream; //
ngx_chain_t *free_raw_bufs; // ,
ngx_chain_t *in; // ,ngx_event_pipe_copy_input_filter buffer in
ngx_chain_t **last_in; //
ngx_chain_t *out; // ,
ngx_chain_t *free; //
ngx_chain_t *busy; // , out
/*
* the input filter i.e. that moves HTTP/1.1 chunks
* from the raw bufs to an incoming chain
*/
ngx_event_pipe_input_filter_pt input_filter; // ,
void *input_ctx; // input_filter , ngx_http_request_t
ngx_event_pipe_output_filter_pt output_filter; // , ngx_http_output_filter
void *output_ctx; // output_filter , ngx_http_request_t
unsigned read:1; // 1
unsigned cacheable:1; // 1
unsigned single_buf:1; // 1 ngx_buf_t
unsigned free_bufs:1; // 1 ,
unsigned upstream_done:1; // input_filter , Nginx
unsigned upstream_error:1; // , 1, ,
unsigned upstream_eof:1; // , 1
unsigned upstream_blocked:1; // , ,
unsigned downstream_done:1; // 1
unsigned downstream_error:1; // , 1
unsigned cyclic_temp_file:1; // 1
ngx_int_t allocated; // , bufs.num
ngx_bufs_t bufs; // ,bufs.size ,bufs.num
ngx_buf_tag_t tag; // 、 ngx_buf_t tag
ssize_t busy_size;
off_t read_length; //
off_t length; //
off_t max_temp_file_size; //
ssize_t temp_file_write_size; //
ngx_msec_t read_timeout; //
ngx_msec_t send_timeout; //
ssize_t send_lowat; // ,TCP
ngx_pool_t *pool; //
ngx_log_t *log; // ngx_log_t
ngx_chain_t *preread_bufs; // ,
size_t preread_size; // ,
ngx_buf_t *buf_to_file; //
size_t limit_rate; //
time_t start_sec; //
ngx_temp_file_t *temp_file; //
/* STUB */ int num; // ngx_buf_t
}
上流ネットワークの速度優先か下流ネットワークの速度優先かにかかわらず、応答の転送はngx_を通じて行われる.http_アップストリームsend_レスポンス関数で行いました.前でダウンストリームの速度を優先する部分の流れを分析しました.残りの部分を分析し続けます.ここをクリックして折り畳みまたは開く
static void ngx_http_upstream_send_response(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
// r->headers_out
rc = ngx_http_send_header(r);
......
// , , ,
if (r->request_body && r->request_body->temp_file) {
ngx_pool_run_cleanup_file(r->pool, r->request_body->temp_file->file.fd);
r->request_body->temp_file->file.fd = NGX_INVALID_FILE;
}
......
// buffering 1 , 0
if (!u->buffering) {
......
return ;
}
/* TODO: preallocate event_pipe bufs, look "Content-Length" */
// pipe upstream , , pipe
p = u->pipe;
//
p->output_filter = (ngx_event_pipe_output_filter_pt) ngx_http_output_filter;
// pipe output_ctx ngx_http_request_t , pipe, pipe->output_ctx ngx_http_request_t
p->output_ctx = r;
// tag
p->tag = u->output.tag;
// bufs
p->bufs = u->conf->bufs;
// busy
p->busy_size = u->conf->busy_buffers_size;
// upstream nginx
p->upstream = u->peer.connection;
// downstream nginx
p->downstream = c;
//
p->pool = r->pool;
// log
p->log = c->log;
//
p->limit_rate = u->conf->limit_rate;
//
p->start_sec = ngx_time();
//
p->cacheable = u->cacheable || u->store;
//
p->temp_file = ngx_pcalloc(r->pool, sizeof(ngx_temp_file_t));
if (p->temp_file == NULL) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}
//
p->temp_file->file.fd = NGX_INVALID_FILE;
p->temp_file->file.log = c->log;
p->temp_file->path = u->conf->temp_path;
p->temp_file->pool = r->pool;
......
//
p->max_temp_file_size = u->conf->max_temp_file_size;
//
p->temp_file_write_size = u->conf->temp_file_write_size;
// , , ngx_buf_t
p->preread_bufs = ngx_alloc_chain_link(r->pool);
if (p->preread_bufs == NULL) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}
// ,( , )
p->preread_bufs->buf = &u->buffer;
p->preread_bufs->next = NULL;
u->buffer.recycled = 1;
p->preread_size = u->buffer.last - u->buffer.pos;
.......
//
p->read_timeout = u->conf->read_timeout;
//
p->send_timeout = clcf->send_timeout;
// TCP send_lowat
p->send_lowat = clcf->send_lowat;
.......
//
u->read_event_handler = ngx_http_upstream_process_upstream;
//
r->write_event_handler = ngx_http_upstream_process_downstream;
//
ngx_http_upstream_process_upstream(r, u);
}
上流の応答イベントのプロcessを読み込むに関わらずubstreamは、クライアントにデータを書くプロシーズです.downstream、最後は全部ngx_を通します.イベントpipeはキャッシュ転送応答を実現する.次はグクスクスを見に来ます.イベントpipeの具体的な実現:ここをクリックして折りたたみまたは開く
ngx_int_t ngx_event_pipe(ngx_event_pipe_t *p, ngx_int_t do_write)
{
// do_write 1 , 0
for ( ;; ) {
if (do_write) {
// do_write 1, ,
rc = ngx_event_pipe_write_to_downstream(p);
......
// NGX_OK , ngx_event_pipe
}
......
//
if (ngx_event_pipe_read_upstream(p) == NGX_ABORT) {
return NGX_ABORT;
}
// , , , do_write
if (!p->read && !p->upstream_blocked) {
break;
}
// , , , do_write
do_write = 1;
}
if (p->upstream->fd != (ngx_socket_t) -1) {
// epoll
if (ngx_handle_read_event(rev, flags) != NGX_OK) {
return NGX_ABORT;
}
//
if (!rev->delayed) {
if (rev->active && !rev->ready) {
ngx_add_timer(rev, p->read_timeout);
......
}
// epoll ,
if (p->downstream->fd != (ngx_socket_t) -1 && p->downstream->data == p->output_ctx)
{
wev = p->downstream->write;
if (ngx_handle_write_event(wev, p->send_lowat) != NGX_OK) {
return NGX_ABORT;
}
if (!wev->delayed) {
if (wev->active && !wev->ready) {
ngx_add_timer(wev, p->send_timeout);
.......
}
return NGX_OK;
}
上の関数で言及したngx_イベントpipe_read_.アップストリームは上流の応答を受信するために使用されます.ここをクリックして折り畳みまたは開く
static ngx_int_t ngx_event_pipe_read_upstream(ngx_event_pipe_t *p)
{
......
for ( ;; ) {
// , , ,
if (p->upstream_eof || p->upstream_error || p->upstream_done) {
break;
}
// preread_bufs NULL ,ready 0 ,
if (p->preread_bufs == NULL && !p->upstream->read->ready) {
break;
}
// preread_bufs , ,
if (p->preread_bufs) {
chain = p->preread_bufs;
// chain , preread_bufs,
p->preread_bufs = NULL;
n = p->preread_size;
// , read 1,
if (n) {
p->read = 1;
}
} else {
.......
} else {
limit = 0;
}
// free_raw_bufs ngx_event_pipe_read_upstream
if (p->free_raw_bufs) {
chain = p->free_raw_bufs;
if (p->single_buf) {
p->free_raw_bufs = p->free_raw_bufs->next;
chain->next = NULL;
} else {
p->free_raw_bufs = NULL;
}
// bufs.num,
} else if (p->allocated bufs.num) {
b = ngx_create_temp_buf(p->pool, p->bufs.size);
if (b == NULL) {
return NGX_ABORT;
}
p->allocated++;
chain = ngx_alloc_chain_link(p->pool);
if (chain == NULL) {
return NGX_ABORT;
}
chain->buf = b;
chain->next = NULL;
// , ready 1 , delay 0
// ready 1, delay 0 ,
} else if (!p->cacheable
&& p->downstream->data == p->output_ctx
&& p->downstream->write->ready
&& !p->downstream->write->delayed)
{
p->upstream_blocked = 1;
break;
// offset , , ,
// , write
} else if (p->cacheable
|| p->temp_file->offset max_temp_file_size)
{
/*
* if it is allowed, then save some bufs from p->in
* to a temporary file, and add them to a p->out chain
*/
// in temp_file , ngx_buf_t in , out
rc = ngx_event_pipe_write_chain_to_temp_file(p);
......
// recv_chain
n = p->upstream->recv_chain(p->upstream, chain, limit);
// free_raw_bufs
if (p->free_raw_bufs) {
chain->next = p->free_raw_bufs;
}
......
while (cl && n > 0) {
// , shadow
ngx_event_pipe_remove_shadow_links(cl->buf);
// , , , input_filter
size = cl->buf->end - cl->buf->last;
if (n >= size) {
// , , input_filter ngx_event_pipe_copy_input_filter , in
cl->buf->last = cl->buf->end;
if (p->input_filter(p, cl->buf) == NGX_ERROR) {
return NGX_ABORT;
}
// ,
n -= size;
ln = cl;
cl = cl->next;
ngx_free_chain(p->pool, ln);
} else {
// , last ,n 0 , last
cl->buf->last += n;
n = 0;
}
}
// cl ( ?), cl NULL; cl , cl NULL
if (cl) {
for (ln = cl; ln->next; ln = ln->next) { /* void */ }
// p->free_raw_bufs NULL , p->free_raw_bufs
ln->next = p->free_raw_bufs;
p->free_raw_bufs = cl;
}
......
}
......
// upstream_eof 1 ,upstream_error , free_raw_bufs
if ((p->upstream_eof || p->upstream_error) && p->free_raw_bufs) {
// input_filter
if (p->input_filter(p, p->free_raw_bufs->buf) == NGX_ERROR) {
return NGX_ABORT;
}
p->free_raw_bufs = p->free_raw_bufs->next;
// free_bufs 1 , ngx_pfree shadow
if (p->free_bufs && p->buf_to_file == NULL) {
for (cl = p->free_raw_bufs; cl; cl = cl->next) {
if (cl->buf->shadow == NULL) {
ngx_pfree(p->pool, cl->buf->start);
......
}
.......
}
受信応答の処理手順を見てから、送信応答の処理フローを見てみます.対応する関数はngx_です.イベントpipe_write_to_downstreamここをクリックして折り畳みまたは開く
static ngx_int_t ngx_event_pipe_write_to_downstream(ngx_event_pipe_t *p)
{
......
for ( ;; ) {
if (p->downstream_error) {
return ngx_event_pipe_drain_chains(p);
}
//
if (p->upstream_eof || p->upstream_error || p->upstream_done) {
// out
if (p->out) {
for (cl = p->out; cl; cl = cl->next) {
cl->buf->recycled = 0;
}
rc = p->output_filter(p->output_ctx, p->out);
......
}
// in
if (p->in) {
for (cl = p->in; cl; cl = cl->next) {
cl->buf->recycled = 0;
}
rc = p->output_filter(p->output_ctx, p->in);
......
}
//
p->downstream_done = 1;
break;
}
......
// busy
for (cl = p->busy; cl; cl = cl->next) {
if (cl->buf->recycled) {
......
bsize += cl->buf->end - cl->buf->start;
prev = cl->buf->start;
}
}
......
// busy_size , flush out
if (bsize >= (size_t) p->busy_size) {
flush = 1;
goto flush;
}
......
for ( ;; ) {
// out NULL, out
if (p->out) {
cl = p->out;
p->out = p->out->next;
// out , in
} else if (!p->cacheable && p->in) {
cl = p->in;
.....
} else {
break;
}
cl->next = NULL;
if (out) {
*ll = cl;
} else {
out = cl;
}
ll = &cl->next;
}
flush:
......
//
rc = p->output_filter(p->output_ctx, out);
// free、busy out
ngx_chain_update_chains(p->pool, &p->free, &p->busy, &out, p->tag);
......
// free , shadow
for (cl = p->free; cl; cl = cl->next) {
......
if (cl->buf->last_shadow) {
if (ngx_event_pipe_add_free_buf(p, cl->buf->shadow) != NGX_OK) {
return NGX_ABORT;
}
cl->buf->last_shadow = 0;
}
cl->buf->shadow = NULL;
}
}
return NGX_OK;
}
いよいよ終了します.ubstreamの流れは複雑です.最後に見てください.ubstreamのお願いが完了しました.ubstreamの要求の終わりの流れは、3つの関数が入ってきます.http_アップストリームfinalize_request、ngx_http_アップストリームcleanup、ngx_http_アップストリームnextその中でcleanupとnextが本当にアップストリームを終了する時にはやはりfinalize_を呼び出します.request関数ngx_http_アップストリームcleanup関数は、up streamを起動すると、要求されたcleanupチェーンに掛けられます.HTTPフレームがhttp要求を終了すると、必ずup streamを呼び出します.cleanup関数ここをクリックして折り畳みまたは開く
static void ngx_http_upstream_cleanup(void *data)
{
ngx_http_request_t *r = data;
ngx_http_upstream_finalize_request(r, r->upstream, NGX_DONE);
}
アップストリームが見られます.cleanupの実現は、実は直接にngx_を呼び出しました.http_アップストリームfinalize_request、この流れは私たちが期待する閉鎖方式です.そしてngx_http_アップストリームnext関数は、サーバを再接続したり、新しいサーバを選択したりするなどのポリシーによって、サービスの利用可能性を高めるために、処理要求の流れにエラーが発生したときに自動的に呼び出されます.現在のnginxの負荷バランスの機能はnext関数によって実現されます.詳しく分析します.ここでは簡単に説明します.ここをクリックして折り畳みまたは開く
static void ngx_http_upstream_next(ngx_http_request_t *r, ngx_http_upstream_t *u, ngx_uint_t ft_type)
{
......
if (status) {
u->state->status = status;
timeout = u->conf->next_upstream_timeout;
// tries 0 , upstream
if (u->peer.tries == 0
|| !(u->conf->next_upstream & ft_type)
|| (timeout && ngx_current_msec - u->peer.start_time >= timeout))
{
ngx_http_upstream_finalize_request(r, u, status);
return;
}
}
// ,
if (u->peer.connection) {
if (u->peer.connection->pool) {
ngx_destroy_pool(u->peer.connection->pool);
}
ngx_close_connection(u->peer.connection);
u->peer.connection = NULL;
}
//
ngx_http_upstream_connect(r, u);
}
最後にグクスクスを見ますhttp_アップストリームfinalize_requestの具体的な実現はクリックして折り畳みますかます開けますか?
static void ngx_http_upstream_finalize_request(ngx_http_request_t *r, ngx_http_upstream_t *u, ngx_int_t rc)
{
// cleanup NULL
if (u->cleanup) {
*u->cleanup = NULL;
u->cleanup = NULL;
}
//
if (u->resolved && u->resolved->ctx) {
ngx_resolve_name_done(u->resolved->ctx);
u->resolved->ctx = NULL;
}
......
// http finalize_request
u->finalize_request(r, rc);
//
if (u->peer.connection) {
if (u->peer.connection->pool) {
ngx_destroy_pool(u->peer.connection->pool);
}
ngx_close_connection(u->peer.connection);
}
u->peer.connection = NULL;
//
if (u->store && u->pipe && u->pipe->temp_file
&& u->pipe->temp_file->file.fd != NGX_INVALID_FILE)
{
if (ngx_delete_file(u->pipe->temp_file->file.name.data)
== NGX_FILE_ERROR)
......
}
......
// HTTP
ngx_http_finalize_request(r, rc);
}
ここでは、アップストリームの処理の流れを大体整理しました.今までに実現された負荷バランスの各種アルゴリズムと、Ngix cache機能について分析します.