这次主要来分析upstream中的发送数据给client, 以及当buf不足,将一部分写到temp file的部分,他们对应的函数分别是ngx_event_pipe_write_to_downstream和ngx_event_pipe_write_chain_to_temp_file.

先来看ngx_event_pipe_write_to_downstream,这个函数顾名思义,就是写buf到临时文件。而所写的buf就是p->in,也就是将要发送给client的数据。

这个函数,它会处理两类的情况,一类是cache打开,一类是cache未打开。我们这里主要来分析cache关闭的情况。

首先来看这个函数的第一部分的代码,这部分代码主要是遍历p->in,然后计算能写多少buf到文件(temp file的size是有限制的).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
  
//out就是将要保存到file的数据

if (p->buf_to_file) {

//cache打开的情况

fl.buf = p->buf_to_file;

fl.next = p->in;

out = &fl;

} else {

//得到数据

out = p->in;

}

//如果cache没有打开

if (!p->cacheable) {

size = 0;

cl = out;

ll = NULL;

ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,

"pipe offset: %O", p->temp_file->offset);

//开始遍历out

do {

//计算大小

bsize = cl->buf->last – cl->buf->pos;

……………………………………………..

//看是否超过限制限制

if ((size + bsize > p->temp_file_write_size)

|| (p->temp_file->offset + size + bsize > p->max_temp_file_size))

{

break;

}

size += bsize;

ll = &cl->next;

cl = cl->next;

} while (cl);

ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0, "size: %z", size);

if (ll == NULL) {

return NGX_BUSY;

}

//cl存在则说明只有一部分buf能够写入到temp file,此时p->in保存剩下的chain

if (cl) {

p->in = cl;

*ll = NULL;

} else {

//否则说明所有的buf都写入到了temp file,此时p->in则设置为空

p->in = NULL;

p->last_in = &p->in;

}

} else {

//cache打开的情况,可以看到和上面类似.

p->in = NULL;

p->last_in = &p->in;

}

然后是第二部分,也就是最后一部分,这部分就是写buf到temp file,然后将已经写到temp file的buf,挂载到free_raw_bufs,以供继续使用。这里有用到shadow buf,因为shadow buf的内存是已经分配好的,而当已经写入到temp file之后,这部分buf自然就可以重新使用了。

还有一个很重要的操作,那就是将已经写入到temp file的buf 挂载到p->out上.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
  
//写out到tempfile

if (ngx_write_chain_to_temp_file(p->temp_file, out) == NGX_ERROR) {

return NGX_ABORT;

}

//遍历free_raw_bufs,以便与接下来将使用过的buf挂载到后面。

for (last_free = &p->free_raw_bufs;

*last_free != NULL;

last_free = &(*last_free)->next)

{

/\* void \*/

}

if (p->buf_to_file) {

p->temp_file->offset = p->buf_to_file->last – p->buf_to_file->pos;

p->buf_to_file = NULL;

out = out->next;

}

//遍历out

for (cl = out; cl; cl = next) {

next = cl->next;

cl->next = NULL;

b = cl->buf;

//可以看到重新设置buf为file。然后设置相关属性

b->file = &p->temp_file->file;

b->file_pos = p->temp_file->offset;

p->temp_file->offset += b->last – b->pos;

b->file_last = p->temp_file->offset;

b->in_file = 1;

b->temp_file = 1;

//这里就是将buf放入到p->out.

if (p->out) {

*p->last_out = cl;

} else {

p->out = cl;

}

//设置last_out

p->last_out = &cl->next;

//shadow存在,则将已经保存到file的buf挂载到free_raw_buf中。

if (b->last_shadow) {

tl = ngx_alloc_chain_link(p->pool);

if (tl == NULL) {

return NGX_ABORT;

}

//可以看到使用它的shadow

tl->buf = b->shadow;

tl->next = NULL;

//last_free就是free_raw_buf的尾部

*last_free = tl;

last_free = &tl->next;

//reset buf

b->shadow->pos = b->shadow->start;

b->shadow->last = b->shadow->start;

//remove掉shadow。

ngx_event_pipe_remove_shadow_links(b->shadow);

}

}

然后来看ngx_event_pipe_write_to_downstream,也就是发送数据到client的部分,这个函数整体是一个大循环, 而在这个大循环内分为三部分,其中第一部分处理upstream已经发送完毕(比如断开连接,出错等)时的情况,第二部分是对发送前的buf进行一些处理(比如busy buf,p->in,p->out等),然后循环发送,第三部分就是调用发送接口(output_filter)发送数据,然后update chain.

ok,接下来我们就来看这三部分。先来看第一部分,这部分就是处理upstream端已经发送完毕的情况,此时需要我们立即发送数据到downstream。

这里需要注意的是在upstream发送的时候,对于buf的选择有一个顺序,通过前面一篇blog,我们能看到当拷贝从upstream的数据的时候,就有一个顺序。

这个顺序是这样子的,首先是p->out,然后是p->in,因为p->out保存了一些buf在文件中的buf。

然后时buf的recycle属性,这个属性主要目的是这样子的,由于在p->input_filter中,nginx制造了一个buf充当已经读取的buf的shadow(last_shadow = 1),而在以后,nginx操作的都是这个buf,这个buf就被设置为recycled,也就是可以循环利用。而且当设置了recycled,则也将会在发送数据的时候,立即将buf发送出去,而不会缓存(可以看ngx_http_write_filter中的判断).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
  
//判断upstream状态

if (p->upstream_eof || p->upstream_error || p->upstream_done) {

/\* pass the p->out and p->in chains to the output filter \*/

//取消掉recycled设置,

for (cl = p->busy; cl; cl = cl->next) {

cl->buf->recycled = 0;

}

//首先发送p->out.

if (p->out) {

ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,

"pipe write downstream flush out");

//取消掉recycled设置,因为已经不要回收这个buf了(后端不会有数据过来)。

for (cl = p->out; cl; cl = cl->next) {

cl->buf->recycled = 0;

}

//调用filter函数

rc = p->output_filter(p->output_ctx, p->out);

//如果发送失败,则设置downstream_error,并且回收chain

if (rc == NGX_ERROR) {

p->downstream_error = 1;

return ngx_event_pipe_drain_chains(p);

}

p->out = NULL;

}

//发送p->in

if (p->in) {

ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,

"pipe write downstream flush in");

//取消recycled设置

for (cl = p->in; cl; cl = cl->next) {

cl->buf->recycled = 0;

}

//发送数据

rc = p->output_filter(p->output_ctx, p->in);

//同上

if (rc == NGX_ERROR) {

p->downstream_error = 1;

return ngx_event_pipe_drain_chains(p);

}

p->in = NULL;

}

//cache相关设置

if (p->cacheable && p->buf_to_file) {

file.buf = p->buf_to_file;

file.next = NULL;

if (ngx_write_chain_to_temp_file(p->temp_file, &file)

== NGX_ERROR)

{

return NGX_ABORT;

}

}

ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,

"pipe write downstream done");

/\* TODO: free unused bufs \*/

p->downstream_done = 1;

break;

}

紧接着是第二部分,这部分主要是处理upstream的数据并没有发送完全,此时nginx会尽量发送最大的可能的数据到client。

这里busy就保存了上次发送没有发送完毕的chain,它主要是为了方便统计。

它的步骤是这样子的,首先会计算busy chain的大小,因为我们有busy chain的限制(有busy buf的命令).然后计算p->in/p->out,最后得到一个最大的chain,然后发送。这里要注意,我们发送的每个buf大小是不会大于busy buf的大小的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
  
//判断是否需要退出循环,这里比较关键的就是downstream->write->ready,当发送返回again,就会从这里退出

if (downstream->data != p->output_ctx

|| !downstream->write->ready

|| downstream->write->delayed)

{

break;

}

prev = NULL;

bsize = 0;

//首先计算busy的chain的大小

for (cl = p->busy; cl; cl = cl->next) {

if (cl->buf->recycled) {

//如果是相同的chain,则跳过计算

if (prev == cl->buf->start) {

continue;

}

//计算大小

bsize += cl->buf->end – cl->buf->start;

prev = cl->buf->start;

}

}

ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,

"pipe write busy: %uz", bsize);

out = NULL;

//如果bsize大于我们设置的busy buf size,则直接发送数据

if (bsize >= (size_t) p->busy_size) {

flush = 1;

goto flush;

}

flush = 0;

ll = NULL;

prev_last_shadow = 1;

//然后开始处理p->out和p->in

for ( ;; ) {

if (p->out) {

cl = p->out;

//计算将要发送的buf是否大于我们设置的busy_size,而cl->buf->last – cl->buf->pos是肯定小于等于busy_size.

if (cl->buf->recycled

&& bsize + cl->buf->last – cl->buf->pos > p->busy_size)

{

//此时当前的cl就不能发送,所以设置flush,然后立即发送

flush = 1;

break;

}

p->out = p->out->next;

//将shadow buf 放到free_raw_bufs中,以便后面使用

ngx_event_pipe_free_shadow_raw_buf(&p->free_raw_bufs, cl->buf);

} else if (!p->cacheable && p->in) {

cl = p->in;

ngx_log_debug3(NGX_LOG_DEBUG_EVENT, p->log, 0,

"pipe write buf ls:%d %p %z",

cl->buf->last_shadow,

cl->buf->pos,

cl->buf->last – cl->buf->pos);

//类似上面的,也是需要计算buf大小,这里可以看到我们只操作影子(shadow)buf,

if (cl->buf->recycled

&& cl->buf->last_shadow

&& bsize + cl->buf->last – cl->buf->pos > p->busy_size)

{

if (!prev_last_shadow) {

//设置out chain

p->in = p->in->next;

cl->next = NULL;

if (out) {

*ll = cl;

} else {

out = cl;

}

}

flush = 1;

break;

}

prev_last_shadow = cl->buf->last_shadow;

p->in = p->in->next;

} else {

break;

}

//如果cl是recycled,则说明这个buf会被发送,因此bsize更新

if (cl->buf->recycled) {

bsize += cl->buf->last – cl->buf->pos;

}

cl->next = NULL;

//将对应的chain绑定到out上,接下来就会发送out。

if (out) {

*ll = cl;

} else {

out = cl;

}

ll = &cl->next;

}

然后是最后一部分,也就是发送chain,然后update chain的操作(类似chain output的操作)

这里用有一个很重要的操作,通过上面的分析我们知道,在upstream中,基本上每个buf都会有一个shadow,而我们发送的时候,用的是shadow(last_shadow=1),当发送完毕,这个buf会被放到p->free中,此时还有一个buf,那就是原始的buf(这个buf是有分配内存的,b->start不为null),因此这里就可以重用这个buf,在nginx中会将发送完毕后的这个buf放到free_raw_buf中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
      
flush:

………………………………….

//发送数据到client

rc = p->output_filter(p->output_ctx, out);

//错误的话,设置downstream_error

if (rc == NGX_ERROR) {

p->downstream_error = 1;

return ngx_event_pipe_drain_chains(p);

}

//update chain,将已经完全发送的chain保存到free,还没发送的保存到busy.

ngx_chain_update_chains(&p->free, &p->busy, &out, p->tag);

//遍历free chain,

for (cl = p->free; cl; cl = cl->next) {

if (cl->buf->temp_file) {

if (p->cacheable || !p->cyclic_temp_file) {

continue;

}

/\* reset p->temp_offset if all bufs had been sent \*/

if (cl->buf->file_last == p->temp_file->offset) {

p->temp_file->offset = 0;

}

}

/\* TODO: free buf if p->free_bufs && upstream done \*/

/\* add the free shadow raw buf to p->free_raw_bufs \*/

//将shadow的buf放到p->free_raw_buf中.

if (cl->buf->last_shadow) {

//可以看到这里操作的是cl->buf->shadow,也就是我们在p->input_filter中,拷贝的那个原始buf。

if (ngx_event_pipe_add_free_buf(p, cl->buf->shadow) != NGX_OK) {

return NGX_ABORT;

}

cl->buf->last_shadow = 0;

}

cl->buf->shadow = NULL;

}

上面的分析,还遗漏了一个函数,那就是ngx_event_pipe_drain_chains,这个函数被调用,说明client出错,此时则需要将对应的shadow buf放到free_raw_buf中(调用(ngx_event_pipe_add_free_buf).

最后还有一个问题没解决,那就是当client断开连接时(当接收完毕所有的header),nginx如何来处理,通过上面的代码,我们能够看到,当发送数据失败时,只是调用ngx_event_pipe_drain_chains然后返回,那么其实这里处理和nginx处理一般的http请求是一样的,那就是暂时忽略client的断开,也就是不管client的状态,当upstream发送完毕,才会认为request处理完成,这样子能简化代码的处理。

并且,这里只有cache没有打开的时候,才会finalize 当前的request,这是因为当cache打开的时候,当前的request 是需要被cache,然后下次请求再次到达,就可以发送cache的了。所以此时需要接受完后端所有的数据。

下面我们就来看这部分代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
  
static void

ngx_http_upstream_process_request(ngx_http_request_t *r)

{

ngx_uint_t del;

ngx_temp_file_t *tf;

ngx_event_pipe_t *p;

ngx_http_upstream_t *u;

u = r->upstream;

p = u->pipe;

//当finalize upstream之后,connection将会被赋值为NULL

if (u->peer.connection) {

if (u->store) {

del = p->upstream_error;

tf = u->pipe->temp_file;

if (p->upstream_eof || p->upstream_done) {

if (u->headers_in.status_n == NGX_HTTP_OK

&& (u->headers_in.content_length_n == -1

|| (u->headers_in.content_length_n == tf->offset)))

{

ngx_http_upstream_store(r, u);

} else {

del = 1;

}

}

if (del && tf->file.fd != NGX_INVALID_FILE) {

if (ngx_delete_file(tf->file.name.data) == NGX_FILE_ERROR) {

ngx_log_error(NGX_LOG_CRIT, r->connection->log, ngx_errno,

ngx_delete_file_n " \"%s\" failed",

u->pipe->temp_file->file.name.data);

}

}

}

//如果upstream端发送完毕(断开等),则finalize request。

if (p->upstream_done || p->upstream_eof || p->upstream_error) {

ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,

"http upstream exit: %p", p->out);

ngx_http_upstream_finalize_request(r, u, 0);

return;

}

}

//如果downstream error设置,则进入下面的处理

if (p->downstream_error) {

ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,

"http upstream downstream error");

//这里可以看到如果cache没有打开,则finalize 当前的request

if (!u->cacheable && !u->store && u->peer.connection) {

ngx_http_upstream_finalize_request(r, u, 0);

}

}

}