这篇主要分析下mochiweb的整体结构。

我这里看的代码是github上的最新代码( https://github.com/mochi/mochiweb )。

由于mochiweb是一个框架,因此我们就从他自带的简单例子入手,来看他是如何工作的。我们就从keepalive.erl开始。

首先来看这个模块的start函数,也就是它的启动函数:

[erlang]

-define(LOOP, {?MODULE, loop}).

start(Options = [{port, _Port}]) ->

mochiweb_http:start([{name, ?MODULE}, {loop, ?LOOP} | Options]).

[/erlang]

可以看到启动函数非常简单,那就是直接调用mochiweb_http模块的start函数。那么我们整个分析流程,就从这个模块的start函数开始。这里要注意,我们可以看到是将当前回调模块的loop函数传递给了mochiweb_http,这就给后续调用回调,提供了接口。

[erlang]

%% @spec start(Options) -> ServerRet

%% Options = [option()]

%% Option = {name, atom()} | {ip, string() | tuple()} | {backlog, integer()}

%% | {nodelay, boolean()} | {acceptor_pool_size, integer()}

%% | {ssl, boolean()} | {profile_fun, undefined | (Props) -> ok}

%% | {link, false}

%% @doc Start a mochiweb server.

%% profile_fun is used to profile accept timing.

%% After each accept, if defined, profile_fun is called with a proplist of a subset of the mochiweb_socket_server state and timing information.

%% The proplist is as follows: [{name, Name}, {port, Port}, {active_sockets, ActiveSockets}, {timing, Timing}].

%% @end

start(Options) ->

mochiweb_socket_server:start(parse_options(Options)).

[/erlang]

这里会调用mochiweb_socket_server的start函数启动mochiweb server,可是在调用之前会调用parse_options来解析。

[erlang]

parse_options(Options) ->

{loop, HttpLoop} = proplists:lookup(loop, Options),

Loop = {?MODULE, loop, [HttpLoop]},

Options1 = [{loop, Loop} | proplists:delete(loop, Options)],

mochilists:set_defaults(?DEFAULTS, Options1).

[/erlang]

这里由于Options是一个tuple list,因此这里调用proplists的lookup函数对整个tuple list进行查找,其中key就是loop,它会和每个tuple的第一个元素比较,如果相等就会返回当前的tuple,下面就是proplists的lookup函数的实现:

[erlang]

lookup(Key, [P | Ps]) ->

if is_atom(P), P =:= Key ->

{Key, true};

%%关键在这里,可以看到会取出tuple的第一个元素和key比较.

tuple_size(P) >= 1, element(1, P) =:= Key ->

%% Note that Key does not have to be an atom in this case.

P;

true ->

lookup(Key, Ps)

end;

lookup(_Key, []) ->

none.

[/erlang]

当解析出来loop回调之后,就进入了mochiweb_socket_server模块的处理,这个模块是一个gen_server.我们来看它的start函数:

[erlang]

start_link(Options) ->

start_server(start_link, parse_options(Options)).

start(Options) ->

case lists:keytake(link, 1, Options) of

{value, {_Key, false}, Options1} ->

start_server(start, parse_options(Options1));

_ ->

%% TODO: https://github.com/mochi/mochiweb/issues/58

%% [X] Phase 1: Add new APIs (Sep 2011)

%% [_] Phase 2: Add deprecation warning

%% [_] Phase 3: Change default to {link, false} and ignore link

%% [_] Phase 4: Add deprecation warning for {link, _} option

%% [_] Phase 5: Remove support for {link, _} option

start_link(Options)

end.

[/erlang]

可以看到link是否存在,最终调用调用parse_options(mochiweb_socket_server)来pase对应的参数,然后再调用start_server函数。这里parse_options就不详细介绍了,只需要知道它是用来解析参数(ip,port…)的就够了。我们详细来看start_server.

[erlang]

start_server(F, State=#mochiweb_socket_server{ssl=Ssl, name=Name}) ->

ok = prep_ssl(Ssl),

case Name of

undefined ->

gen_server:F(?MODULE, State, []);

_ ->

gen_server:F(Name, ?MODULE, State, [])

end.

[/erlang]

可以看到start_server会直接调用gen_sever的start函数,也就是启动当前gen server,不过我们注意到这里调用的是start,而不是start_link,这是因为在mochiweb中,并没有实现supervision行为的模块,而是在当前的mochiweb_socket_server中实现了简单的监控树行为,后续我们会看到。

在继续看mochiweb_socket_server的init函数之前,我们先来看核心的数据结构,也就是mochiweb_socket_server这个record,我们可以看到这个record包含了将会被回调的loop,连接池,backlog等等的信息,而这个结构就是在刚才上面没有分析的parse_options中设置的,然后最终传递给init函数。可以看到这里面主要是一些将要传递给erlang的socket option。

[erlang]

-record(mochiweb_socket_server,

{port,

loop,

name=undefined,

%% NOTE: This is currently ignored.

max=2048,

ip=any,

listen=null,

nodelay=false,

backlog=128,

active_sockets=0,

acceptor_pool_size=16,

ssl=false,

ssl_opts=[{ssl_imp, new}],

acceptor_pool=sets:new(),

profile_fun=undefined}).

[/erlang]

然后来看init函数.这个函数主要是构造socket opts,然后传递listen函数。

[erlang]

init(State=#mochiweb_socket_server{ip=Ip, port=Port, backlog=Backlog, nodelay=NoDelay}) ->

process_flag(trap_exit, true),

BaseOpts = [binary,

{reuseaddr, true},

{packet, 0},

{backlog, Backlog},

{recbuf, ?RECBUF_SIZE},

{active, false},

{nodelay, NoDelay}],

Opts = case Ip of

any ->

case ipv6_supported() of % IPv4, and IPv6 if supported

true -> [inet, inet6 | BaseOpts];

_ -> BaseOpts

end;

{_, _, _, _} -> % IPv4

[inet, {ip, Ip} | BaseOpts];

{_, _, _, _, _, _, _, _} -> % IPv6

[inet6, {ip, Ip} | BaseOpts]

end,

listen(Port, Opts, State).

[/erlang]

这里要注意最关键的一句那就是process_flag(trap_exit, true)这也就说明这个gen_server充当了supervision的角色,它会监控所有的子进程,而对应的重启策略也就类似supervision的 simple_one_one,这个后续会分析.

然后就是listen函数了。这里的设计非常巧妙,因为在一般的server设计中,比如http,都是来一个请求,accept到句柄,然后spawn一个进程,将句柄传递给子进程,用完然后销毁,可是在mochiweb中,不是这么做的,因为这么做会有一定的性能损失,因为始终还是有进程的切换。

在mochiweb中,会创建一个accept pool,这个个数默认是16,不过可以通过传递的参数修改的。这个accept pool,就是说mochiweb会首先启动16个子进程,都同时阻塞在accept调用上,然后如果一个请求过来,某个子进程被唤醒,唤醒之后,子进程会发消息给父进程,然后子进程此时就会变成一个worker进程,那就是说它不会在accept句柄了,这就有点像一个退化的过程。然后父进程接收到消息之后,会重新再启动一个对应的accept进程。

来看源码

[erlang]

new_acceptor_pool(Listen,

State=#mochiweb_socket_server{acceptor_pool=Pool,

acceptor_pool_size=Size,

loop=Loop}) ->

F = fun (_, S) ->

Pid = mochiweb_acceptor:start_link(self(), Listen, Loop),

sets:add_element(Pid, S)

end,

Pool1 = lists:foldl(F, Pool, lists:seq(1, Size)),

State#mochiweb_socket_server{acceptor_pool=Pool1}.

listen(Port, Opts, State=#mochiweb_socket_server{ssl=Ssl, ssl_opts=SslOpts}) ->

%%调用listen,如果不是https,则会调用gen_tcp:listen创建listen句柄

case mochiweb_socket:listen(Ssl, Port, Opts, SslOpts) of

{ok, Listen} ->

{ok, ListenPort} = mochiweb_socket:port(Listen),

{ok, new_acceptor_pool(

Listen,

State#mochiweb_socket_server{listen=Listen,

port=ListenPort})};

{error, Reason} ->

{stop, Reason}

end.

[/erlang]

上面的代码中,最核心的就是new_acceptor_pool这个函数,这个函数调用lists:foldl来循环调用mochiweb_acceptor:start_link启动accept子进程,然后将所有启动的子进程保存到acceptor_pool中。

然后就来看mochiweb_acceptor的start_link函数,这个函数很简单,就是启动一个子进程,然后链接到mochiweb_socket_server父进程。这里只需要注意一个地方,那就是accept到句柄之后,就直接给父进程发送了一个accepted消息。

[erlang]

start_link(Server, Listen, Loop) ->

proc_lib:spawn_link(?MODULE, init, [Server, Listen, Loop]).

init(Server, Listen, Loop) ->

T1 = now(),

case catch mochiweb_socket:accept(Listen) of

{ok, Socket} ->

%%发送消息

gen_server:cast(Server, {accepted, self(), timer:now_diff(now(), T1)}),

%%调用loop回调

call_loop(Loop, Socket);

{error, closed} ->

exit(normal);

{error, timeout} ->

init(Server, Listen, Loop);

{error, esslaccept} ->

exit(normal);

Other ->

error_logger:error_report(

[{application, mochiweb},

“Accept failed error”,

lists:flatten(io_lib:format(“~p”, [Other]))]),

exit({error, accept_failed})

end.

call_loop(Loop, Socket) ->

Loop(Socket).

[/erlang]

最后我们就来看父进程接收到accepted消息后,如何处理,这里还有一个多的要注意的,那就是整个server状态中,还保存了所有activesocket的个数,也就是已经accept的socket个数。

[erlang]

handle_cast({accepted, Pid, Timing},

State=#mochiweb_socket_server{active_sockets=ActiveSockets}) ->

%%active socket个数更新

State1 = State#mochiweb_socket_server{active_sockets=1 + ActiveSockets},

case State#mochiweb_socket_server.profile_fun of

undefined ->

undefined;

F when is_function(F) ->

catch F([{timing, Timing} | state_to_proplist(State1)])

end,

%%调用recyle_acceptor来回收子进程

{noreply, recycle_acceptor(Pid, State1)};

[/erlang]

然后就是recycle_acceptor函数,它用来重新启动新的acceptor进程。

[erlang]

recycle_acceptor(Pid, State=#mochiweb_socket_server{

acceptor_pool=Pool,

listen=Listen,

loop=Loop,

active_sockets=ActiveSockets}) ->

%%判断进程id是否是刚才启动的进程id。

case sets:is_element(Pid, Pool) of

true ->

%%重新启动一个acceptor进程

Acceptor = mochiweb_acceptor:start_link(self(), Listen, Loop),

%%更新状态

Pool1 = sets:add_element(Acceptor, sets:del_element(Pid, Pool)),

State#mochiweb_socket_server{acceptor_pool=Pool1};

false ->

State#mochiweb_socket_server{active_sockets=ActiveSockets – 1}

end.

[/erlang]

此时如果当子进程异常退出,那么要怎么办呢,这里也就是如何处理EXIT消息.这里主要是区分正常退出和异常退出,异常退出的话,需要打印日志,而正常的话,什么都不许要打印。剩余的操作都一致,那就是调用recycle_acceptor来看是否需要更新状态。

[erlang]

handle_info({‘EXIT’, Pid, normal}, State) ->

{noreply, recycle_acceptor(Pid, State)};

handle_info({‘EXIT’, Pid, Reason},

State=#mochiweb_socket_server{acceptor_pool=Pool}) ->

case sets:is_element(Pid, Pool) of

true ->

%% If there was an unexpected error accepting, log and sleep.

error_logger:error_report({?MODULE, ?LINE,

{acceptor_error, Reason}}),

timer:sleep(100);

false ->

ok

end,

{noreply, recycle_acceptor(Pid, State)};

[/erlang]

这次就分析到这里,后续的会分析mochiweb的处理请求部分。