这次主要来分析当upstream发送过来数据之后,nginx是如何来处理。不过这里我忽略了cache部分,以后我会专门来分析nginx的cache部分。

在前面blog我们能得知upstream端的读回调函数是ngx_http_upstream_process_header,因此这次我们就从ngx_http_upstream_process_header的分析开始。

下面是ngx_http_upstream_process_header执行的流程图.

upstream_process_header

首先,需要分配内存用来接收后端的数据,这里这个buffer就是u->buffer,在fastcgi或者proxy中,我们可以通过fastcgi_buffer_size或者proxy_buffer_size对这个值进行设置,它的初始大小是页的大小。

还有一个要注意的就是u->headers_in,这个头的含义是这样子的,由于upstream是一个通用的组件,因此它不知道后端的协议,而对于client来说,由于http是需要header的,而后端的协议不一定有头,此时就需要我们通过解析后端的协议,然后来设置好发送给client的头,最终发送给client。

因此此时就需要我们在自己写的upstream模块根据和后端的通信协议来解析数据(就是process_header回调函数),然后将解析好的数据(header)转换为http的头,此时这些头就是保存在u->headers_in中(详细可以看下fastcgi和process_header或者proxy的).。这里要注意u->headers_in的类型和request里面的类型可是不一样的。不过它和r->headers_in类似,都是将一些常用的头直接放到域里面,而把所有的头都放到u->headers_in.headers这个list中,因此这里就需要初始化u->headers_in.headers这个list.而具体这些是如何做的,我们后面会详细分析.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
      
if (u->buffer.start == NULL) {

//分配一块buffer,可以看到大小为u->conf->buffer_size(fastcgi_buffer_size或者proxy_buffer_size)

u->buffer.start = ngx_palloc(r->pool, u->conf->buffer_size);

if (u->buffer.start == NULL) {

ngx_http_upstream_finalize_request(r, u,

NGX_HTTP_INTERNAL_SERVER_ERROR);

return;

}

//然后初始化对应的域

u->buffer.pos = u->buffer.start;

u->buffer.last = u->buffer.start;

u->buffer.end = u->buffer.start + u->conf->buffer_size;

u->buffer.temporary = 1;

u->buffer.tag = u->output.tag;

//初始化headers.

if (ngx_list_init(&u->headers_in.headers, r->pool, 8,

sizeof(ngx_table_elt_t))

!= NGX_OK)

{

ngx_http_upstream_finalize_request(r, u,

NGX_HTTP_INTERNAL_SERVER_ERROR);

return;

}

#if (NGX_HTTP_CACHE)

if (r->cache) {

u->buffer.pos += r->cache->header_start;

u->buffer.last = u->buffer.pos;

}

#endif

}

接下来就是从upstream读取数据,判断返回值,处理错误,如果一切正常,则调用u->process_header,这个也就是我们写upstream模块时,挂载的process_header回调,一般来说,这个回调主要是解析upstream读到数据,得到后端传递过来的头,然后设置u->headers_in中相关的域.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
      
for ( ;; ) {

//接收数据

n = c->recv(c, u->buffer.last, u->buffer.end – u->buffer.last);

if (n == NGX_AGAIN) {

#if 0

ngx_add_timer(rev, u->read_timeout);

#endif

if (ngx_handle_read_event(c->read, 0) != NGX_OK) {

ngx_http_upstream_finalize_request(r, u,

NGX_HTTP_INTERNAL_SERVER_ERROR);

return;

}

return;

}

//如果为0,则说明upstream已经关闭了连接

if (n == 0) {

ngx_log_error(NGX_LOG_ERR, c->log, 0,

"upstream prematurely closed connection");

}

if (n == NGX_ERROR || n == 0) {

ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);

return;

}

//更新buffer

u->buffer.last += n;

#if 0

u->valid_header_in = 0;

u->peer.cached = 0;

#endif

//然后调用挂载的回调函数

rc = u->process_header(r);

//如果返回again,则说明后端的数据发送不完全,此时需要再次读取.

if (rc == NGX_AGAIN) {

if (u->buffer.pos == u->buffer.end) {

ngx_log_error(NGX_LOG_ERR, c->log, 0,

"upstream sent too big header");

ngx_http_upstream_next(r, u,

NGX_HTTP_UPSTREAM_FT_INVALID_HEADER);

return;

}

continue;

}

break;

}

ok,接下来我会以fastcgi中的process_header的代码片段来分析当调用了u->process_header之后,都发生了什么事情。

在看这个之前,我们先来看u->headers_in的结构.,可以看到它和r->header_out(ngx_http_headers_out_t)很类似,只不过更简单一些。后面我们可以看到为什么这么类似。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
  
typedef struct {

//保存了所有的将要传递给client的头

ngx_list_t headers;

//这里用来设置发送给client的 状态码

ngx_uint_t status_n;

ngx_str_t status_line;

//下面这些头是为了更方便的存取

ngx_table_elt_t *status;

ngx_table_elt_t *date;

ngx_table_elt_t *server;

ngx_table_elt_t *connection;

ngx_table_elt_t *expires;

ngx_table_elt_t *etag;

ngx_table_elt_t *x_accel_expires;

ngx_table_elt_t *x_accel_redirect;

ngx_table_elt_t *x_accel_limit_rate;

ngx_table_elt_t *content_type;

ngx_table_elt_t *content_length;

ngx_table_elt_t *last_modified;

ngx_table_elt_t *location;

ngx_table_elt_t *accept_ranges;

ngx_table_elt_t *www_authenticate;

#if (NGX_HTTP_GZIP)

ngx_table_elt_t *content_encoding;

#endif

off_t content_length_n;

ngx_array_t cache_control;

} ngx_http_upstream_headers_in_t;

然后来看fastcgi的代码片段,下面这段代码就是fastcgi解析upstream中读取的数据,然后将解析到的头设置到u->headers_in中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
          
for ( ;; ) {

part_start = u->buffer.pos;

part_end = u->buffer.last;

//pase header

rc = ngx_http_parse_header_line(r, &u->buffer, 1);

ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,

"http fastcgi parser: %d", rc);

if (rc == NGX_AGAIN) {

break;

}

//到达这里说明一个header已经被解析出来了.

if (rc == NGX_OK) {

/\* a header line has been parsed successfully \*/

//此时从headers list里面取出一个table。

h = ngx_list_push(&u->headers_in.headers);

if (h == NULL) {

return NGX_ERROR;

}

if (f->split_parts && f->split_parts->nelts) {

………………………………………………………………………..

} else {

//开始构造头,将解析好的指针赋值给h

h->key.len = r->header_name_end – r->header_name_start;

h->value.len = r->header_end – r->header_start;

h->key.data = ngx_pnalloc(r->pool,

h->key.len + 1 + h->value.len + 1

+ h->key.len);

if (h->key.data == NULL) {

return NGX_ERROR;

}

h->value.data = h->key.data + h->key.len + 1;

h->lowcase_key = h->key.data + h->key.len + 1

+ h->value.len + 1;

//开始复制值

ngx_cpystrn(h->key.data, r->header_name_start,

h->key.len + 1);

ngx_cpystrn(h->value.data, r->header_start,

h->value.len + 1);

}

h->hash = r->header_hash;

if (h->key.len == r->lowcase_index) {

ngx_memcpy(h->lowcase_key, r->lowcase_header, h->key.len);

} else {

ngx_strlow(h->lowcase_key, h->key.data, h->key.len);

}

//先从umcf->headers_in_hash中查找,这个hash我们紧接着就会分析。

hh = ngx_hash_find(&umcf->headers_in_hash, h->hash,

h->lowcase_key, h->key.len);

//如果存在则调用hh->handler,接下来会分析这个.

if (hh && hh->handler(r, h, hh->offset) != NGX_OK) {

return NGX_ERROR;

}

ngx_log_debug2(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,

"http fastcgi header: \"%V: %V\"",

&h->key, &h->value);

if (u->buffer.pos < u->buffer.last) {

continue;

}

/\* the end of the FastCGI record \*/

break;

}

上面的代码有两个疑问,一个是umcf->headers_in_hash,一个是hh->handler,接下来就来分析这两个东西。

主要是http有一些头,我们会经常用到,或者有一些头,nginx需要忽略掉(Connection, 因为nginx不支持后端的http 1.1).因此nginx就将这些头特殊处理,常用到的放到固定的域,以便与存取,忽略掉的,则直接赋值为空。

而在nginx中就构造了一个静态数组ngx_http_upstream_headers_in(而umcf->headers_in_hash里面就是这个数组的元素),这个数组是ngx_http_upstream_header_t类型的.下面我们先来看这个类型的定义。

这个结构有两个需要注意的域,一个是handler,一个是copy_handler,这两个回调的区别是这样子的。

1
2
3
4
  
typedef ngx_int_t (\*ngx_http_header_handler_pt)(ngx_http_request_t \*r,

ngx_table_elt_t *h, ngx_uint_t offset);

handler回调用于将传递进来的头(ngx_table_elt_t)赋值(只是改变指针)给ngx_http_upstream_headers_in_t中对应的域。

而copy_handler用于将头赋值给r->header_out,也就是发送给client的头。而可以看到r->header_out中的大部分header 域的偏移和u->header_in的偏移是一样的,这样我们赋值给r->header_out就是非常简单了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
  
typedef struct {

//头的名字

ngx_str_t name;

ngx_http_header_handler_pt handler;

//在ngx_http_upstream_headers_in_t的偏移

ngx_uint_t offset;

ngx_http_header_handler_pt copy_handler;

ngx_uint_t conf;

ngx_uint_t redirect; /\* unsigned redirect:1; \*/

} ngx_http_upstream_header_t;

然后我们来看ngx_http_upstream_headers_i这个数组。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
  
ngx_http_upstream_header_t ngx_http_upstream_headers_in[] = {

{ ngx_string("Status"),

ngx_http_upstream_process_header_line,

offsetof(ngx_http_upstream_headers_in_t, status),

ngx_http_upstream_copy_header_line, 0, 0 },

{ ngx_string("Content-Type"),

ngx_http_upstream_process_header_line,

offsetof(ngx_http_upstream_headers_in_t, content_type),

ngx_http_upstream_copy_content_type, 0, 1 },

&#8230;&#8230;&#8230;&#8230;&#8230;&#8230;&#8230;&#8230;&#8230;&#8230;&#8230;&#8230;&#8230;&#8230;&#8230;&#8230;&#8230;&#8230;&#8230;&#8230;&#8230;&#8230;&#8230;&#8230;&#8230;&#8230;&#8230;&#8230;&#8230;&#8230;

}

从上面看,这里用的最多的回调就是ngx_http_upstream_process_header_line和ngx_http_upstream_copy_header_line,其它的和他们的功能类似,因此这里我们就详细分析这两个回调,看看里面都是怎么做的。

ngx_http_upstream_process_header_line的做法很简单,取得r->upstream->headers_in的指针,然后通过传递进来的偏移来确定header的位置指针,最后将h赋值给它。

这里可以看到如果已经存在则忽略后面的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
  
static ngx_int_t

ngx_http_upstream_process_header_line(ngx_http_request_t \*r, ngx_table_elt_t \*h,

ngx_uint_t offset)

{

ngx_table_elt_t **ph;

//取得header指针

ph = (ngx_table_elt_t *\*) ((char \*) &r->upstream->headers_in + offset);

if (*ph == NULL) {

//赋值

*ph = h;

}

return NGX_OK;

}

然后是copy handler,它的实现也很简单,就是从r->headers_out.headers取出来头(这是因为header要在headers和对应的域各保存一个指针),然后赋值给对应偏移的指针.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
  
static ngx_int_t

ngx_http_upstream_copy_header_line(ngx_http_request_t \*r, ngx_table_elt_t \*h,

ngx_uint_t offset)

{

ngx_table_elt_t \*ho, \**ph;

//取出header

ho = ngx_list_push(&r->headers_out.headers);

if (ho == NULL) {

return NGX_ERROR;

}

\*ho = \*h;

//如果offset存在,则赋值.

if (offset) {

ph = (ngx_table_elt_t *\*) ((char \*) &r->headers_out + offset);

*ph = ho;

}

return NGX_OK;

}

copy_handler是什么时候调用的呢,我们接着分析下面的代码。让我们回到ngx_http_upstream_process_header,来看它后面的代码。

下面的代码就是调用完u->process_header之后的处理.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
      
if (rc == NGX_HTTP_UPSTREAM_INVALID_HEADER) {

ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_INVALID_HEADER);

return;

}

if (rc == NGX_ERROR) {

ngx_http_upstream_finalize_request(r, u,

NGX_HTTP_INTERNAL_SERVER_ERROR);

return;

}

/\* rc == NGX_OK \*/

//错误处理.

if (u->headers_in.status_n > NGX_HTTP_SPECIAL_RESPONSE) {

if (r->subrequest_in_memory) {

u->buffer.last = u->buffer.pos;

}

if (ngx_http_upstream_test_next(r, u) == NGX_OK) {

return;

}

if (ngx_http_upstream_intercept_errors(r, u) == NGX_OK) {

return;

}

}

//执行后续工作,主要是设置将要发送给client的header(r->header_out).接下来会详细分析.

if (ngx_http_upstream_process_headers(r, u) != NGX_OK) {

return;

}

if (!r->subrequest_in_memory) {

//然后发送response到client端.

ngx_http_upstream_send_response(r, u);

return;

}

//下面是subrequst的代码,暂时跳过

&#8230;&#8230;&#8230;&#8230;&#8230;&#8230;&#8230;&#8230;&#8230;&#8230;&#8230;&#8230;&#8230;&#8230;&#8230;&#8230;&#8230;.

然后来看ngx_http_upstream_process_headers,这个函数会对一个x_accel_redirect的头进行特殊处理,这个头主要是nginx提供了一种机制,让后端的server能够控制访问权限。比如后端限制某个页面不能被用户访问,那么当用户访问这个页面的时候,后端server只需要设置X-Accel-Redirect这个头到一个路径,然后nginx将会输出这个路径的内容给用户.

来看这部分的代码片段.下面这部分就是处理X-Accel-Redirect这个头,首先拷贝header,然后取出X-Accel-Redirect头后面的地址进行内部重定向.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
      
if (u->headers_in.x_accel_redirect

&& !(u->conf->ignore_headers & NGX_HTTP_UPSTREAM_IGN_XA_REDIRECT))

{

ngx_http_upstream_finalize_request(r, u, NGX_DECLINED);

//便利headers

part = &u->headers_in.headers.part;

h = part->elts;

for (i = 0; /\* void \*/; i++) {

if (i >= part->nelts) {

if (part->next == NULL) {

break;

}

part = part->next;

h = part->elts;

i = 0;

}

//如果在ngx_http_upstream_headers_in中存在,并且这个头当redirect之后,还是不变的,此时则调用copy_handler.

hh = ngx_hash_find(&umcf->headers_in_hash, h[i].hash,

h[i].lowcase_key, h[i].key.len);

if (hh && hh->redirect) {

if (hh->copy_handler(r, &h[i], hh->conf) != NGX_OK) {

ngx_http_finalize_request(r,

NGX_HTTP_INTERNAL_SERVER_ERROR);

return NGX_DONE;

}

}

}

//取出uri

uri = &u->headers_in.x_accel_redirect->value;

ngx_str_null(&args);

flags = NGX_HTTP_LOG_UNSAFE;

//parse

if (ngx_http_parse_unsafe_uri(r, uri, &args, &flags) != NGX_OK) {

ngx_http_finalize_request(r, NGX_HTTP_NOT_FOUND);

return NGX_DONE;

}

if (r->method != NGX_HTTP_HEAD) {

r->method = NGX_HTTP_GET;

}

r->valid_unparsed_uri = 0;

//内部重定向

ngx_http_internal_redirect(r, uri, &args);

ngx_http_finalize_request(r, NGX_DONE);

return NGX_DONE;

}

接下来就是没有X-Accel-Redirect头的情况.这个时候,前部分和上面处理类似,首先从ngx_http_upstream_headers_in查找,如果存在则调用copy_handler,然后再调用ngx_http_upstream_copy_header_line将剩余的头copy到r->header_out.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
      
part = &u->headers_in.headers.part;

h = part->elts;

//开始遍历

for (i = 0; /\* void \*/; i++) {

if (i >= part->nelts) {

if (part->next == NULL) {

break;

}

part = part->next;

h = part->elts;

i = 0;

}

//查找hash,如果是需要hide的头,则continue

if (ngx_hash_find(&u->conf->hide_headers_hash, h[i].hash,

h[i].lowcase_key, h[i].key.len))

{

continue;

}

//否则hash查找

hh = ngx_hash_find(&umcf->headers_in_hash, h[i].hash,

h[i].lowcase_key, h[i].key.len);

if (hh) {

//调用copy_header

if (hh->copy_handler(r, &h[i], hh->conf) != NGX_OK) {

ngx_http_upstream_finalize_request(r, u,

NGX_HTTP_INTERNAL_SERVER_ERROR);

return NGX_DONE;

}

continue;

}

//最后copy剩下的header.

if (ngx_http_upstream_copy_header_line(r, &h[i], 0) != NGX_OK) {

ngx_http_upstream_finalize_request(r, u,

NGX_HTTP_INTERNAL_SERVER_ERROR);

return NGX_DONE;

}

}

这次的分析就到这里,下一次我将会详细分析 upstream最复杂的一块,也就是发送数据到client的部分(ngx_http_upstream_send_response).