前几天nginx的0.8.x正式成为stable,然后看了下代码,发现0.8加入了linux native aio的支持,我们知道在linux下有两种aio,一种是glibc实现的aio,这个比较烂,它是直接在用户空间用pthread进行模拟的。还有一种就是内核实现的aio,这些系统调用是以io_xxx开始的,而在nginx的0.8 中使用的是后一种,下面我们简称后一种为native aio.这里注意native aio只支持direct io。

而native aio的优点就是能够同时提交多个io请求给内核,然后直接由内核的io调度算法去处理这些请求(direct io),这样的话,内核就有可能执行一些合并,优化。

native aio包含下面几个系统调用:

1
2
  
io_setup(2),io_cancel(2), io_destroy(2), io_getevents(2), io_submit(2)

要使用他们必须安装libaio这个库,这个库也就是简单的封装了上面的几个系统调用,而nginx中没有使用libaio这个库,而是直接使用syscall来调用系统调用。现在先大概介绍下这几个api,io_setup用于建立一个aio的环境,io_cancel用于删除一个提交的句柄的任务。io_getevents用于得到任务执行完毕之后的事件信息。io_submit用于提交任务。详细的api介绍去看man手册。

然后就来看nginx是如何使用它的。这里nginx是这样处理的,它利用了系统调用eventfd,用eventfd建立一个句柄,然后将这个句柄加入到epoll中监听,然后在io_submit提交任务的时候,将aio_flags设置为IOCB_FLAG_RESFD,并且aio_resfd设置为eventfd建立的那个句柄,这样当io请求完成后,会象aio_resfd中写入完成的请求数,然后此时epoll就接到可读通知,从而进行后续操作。

首先是在ngx_epoll_init中,这里主要是建立aio的环境.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
  
#if (NGX_HAVE_FILE_AIO)

{

int n;

struct epoll_event ee;

//调用eventfd

ngx_eventfd = syscall(SYS_eventfd, 0);

if (ngx_eventfd == -1) {

ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,

"eventfd() failed");

return NGX_ERROR;

}

n = 1;

//设置异步

if (ioctl(ngx_eventfd, FIONBIO, &n) == -1) {

ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,

"ioctl(eventfd, FIONBIO) failed");

}

ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,

"eventfd: %d", ngx_eventfd);

//安装aio的环境

n = io_setup(1024, &ngx_aio_ctx);

if (n != 0) {

ngx_log_error(NGX_LOG_EMERG, cycle->log, -n, "io_setup() failed");

return NGX_ERROR;

}

//设置将要传递给epoll的数据,可以看到都是和eventfd关联的

ngx_eventfd_event.data = &ngx_eventfd_conn;

//这个就是当eventfd可读被通知后,epoll所将要执行的读方法。

ngx_eventfd_event.handler = ngx_epoll_eventfd_handler;

ngx_eventfd_event.log = cycle->log;

ngx_eventfd_event.active = 1;

ngx_eventfd_conn.fd = ngx_eventfd;

ngx_eventfd_conn.read = &ngx_eventfd_event;

ngx_eventfd_conn.log = cycle->log;

//检测可读事件

ee.events = EPOLLIN|EPOLLET;

ee.data.ptr = &ngx_eventfd_conn;

//加入到epoll中。

if (epoll_ctl(ep, EPOLL_CTL_ADD, ngx_eventfd, &ee) == -1) {

ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,

"epoll_ctl(EPOLL_CTL_ADD, eventfd) failed");

return NGX_ERROR;

}

}

#endif

}

然后是提交任务,这个动作是在ngx_file_aio_read中进行的。它主要是设置io_submit所需要的参数,然后传递io请求给io_submit.这里有个关键的就是aio_flags的设置,这个标记说明我们要使用aio_resfd来接收aio执行的结果。其实也就是执行的io任务的个数.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
  
ngx_memzero(&aio->aiocb, sizeof(struct iocb));

aio->aiocb.aio_data = (uint64_t) (uintptr_t) ev;

//这里设置是读,nginx只使用到了异步读取

aio->aiocb.aio_lio_opcode = IOCB_CMD_PREAD;

aio->aiocb.aio_fildes = file->fd;

aio->aiocb.aio_buf = (uint64_t) (uintptr_t) buf;

aio->aiocb.aio_nbytes = size;

aio->aiocb.aio_offset = offset;

//设置标记

aio->aiocb.aio_flags = IOCB_FLAG_RESFD;

aio->aiocb.aio_resfd = ngx_eventfd;

ev->handler = ngx_file_aio_event_handler;

piocb[0] = &aio->aiocb;

//提交请求

n = io_submit(ngx_aio_ctx, 1, piocb);

最后是当io请求完成后,nginx的回调函数如何处理的。前面的代码分析我们知道回调函数是ngx_epoll_eventfd_handler,因此我们就来看这个函数,它的流程比较简单,首先从eventfd中读取返回的事件个数,然后调用io_getevents来获得所完成的io请求事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
  
static void

ngx_epoll_eventfd_handler(ngx_event_t *ev)

{

int n;

long i, events;

uint64_t ready;

ngx_err_t err;

ngx_event_t *e;

ngx_event_aio_t *aio;

struct io_event event[64];

struct timespec ts;

ngx_log_debug0(NGX_LOG_DEBUG_EVENT, ev->log, 0, "eventfd handler");

//开始读取完成的

n = read(ngx_eventfd, &ready, 8);

err = ngx_errno;

…………………………………………………..

ts.tv_sec = 0;

ts.tv_nsec = 0;

while (ready) {

//用来得到所完成event,events就是事件的个数,而event则是一个事件数组

events = io_getevents(ngx_aio_ctx, 1, 64, event, &ts);

ngx_log_debug1(NGX_LOG_DEBUG_EVENT, ev->log, 0,

"io_getevents: %l", events);

if (events > 0) {

ready -= events;

//遍历event,然后处理事件.

for (i = 0; i < events; i++) {

ngx_log_debug4(NGX_LOG_DEBUG_EVENT, ev->log, 0,

"io_event: %uXL %uXL %L %L",

event[i].data, event[i].obj,

event[i].res, event[i].res2);

e = (ngx_event_t *) (uintptr_t) event[i].data;

e->complete = 1;

e->active = 0;

e->ready = 1;

aio = e->data;

aio->res = event[i].res;

//post事件

ngx_post_event(e, &ngx_posted_events);

}

continue;

}

&#8230;&#8230;&#8230;&#8230;&#8230;&#8230;&#8230;&#8230;&#8230;&#8230;&#8230;&#8230;..

}

}