在霸爷的推荐下,看了hotwheels的代码,接下来我就来分析下hotwheels的代码(主要是server端代码),hotwheels是干吗的呢,介绍在这里:

https://github.com/tolbrino/hotwheels

Janus is a messaging server optimized to unicast over TCP to thousands of clients subscribed to topics of interest.

The ultimate goal is to maintain a latency of less than 2 seconds for 20 thousand clients on Amazon EC2 (small instance).

首先来看janus.app:

[erlang]

{application, janus,

[{description, “Janus”},

{vsn, “0.0.1”},

{id, “janus”},

{modules, [barrier,

bin,

bot,

client_proxy,

common,

flashbot,

histo,

janus,

janus_acceptor,

janus_admin,

janus_app,

janus_flash,

launcher,

mapper,

pubsub,

topman,

t,

transport,

util

]},

{registered, [janus_sup,

janus_topman_sup,

janus_proxy_mapper_sup,

janus_transport_sup,

janus_listener]},

{applications, [kernel,

stdlib,

mnesia,

inets

]},

{mod, {janus_app, []}},

{env, []}

]

}.

[/erlang]

具体每个域的意思这里就不介绍了,详细可以去看erlang的文档 http://www.erlang.org/doc/design_principles/applications.html

我们主要来看mod这个tuple,可以看到回调模块是janus_app,所以我们就从janus_app开始。

通过模块定义我们可以清楚的看到这个模块是一个application:

[erlang]

-module(janus_app).

-behaviour(application).

[/erlang]

因此我们来看它的start函数:

[erlang]

-define(LISTEN_PORT, 8081).

start(_Type, _Args) ->

Port = janus_admin:get_env(listen_port, ?LISTEN_PORT),

supervisor:start_link({local, ?MODULE},

?MODULE,

[Port, transport]).

[/erlang]

这里可以看到首先会从环境变量里面取得端口(命令行参数),而默认的port是8001,然后调用supervisor start_link函数,这个函数会启动创建并启动一个supervisor,这里可以看到回调模块是当前模块,因此我们接下来就来看当前模块的init函数.

init返回的child spec的格式我就不介绍了,可以去看erlang的手册

[erlang]

init([Port, Module]) ->

{ok,

{_SupFlags = {one_for_one, ?MAX_RESTART, ?MAX_TIME},

[

%% TCP server

{janus_sup,

{janus_acceptor, start_link, [self(), Port, Module]},

permanent,

2000,

worker,

[janus_acceptor]

},

%% Topic manager

{janus_topman_sup,

{topman, start, []},

permanent,

2000,

worker,

[topman]

},

%% Client proxy mapper

{janus_proxy_mapper_sup,

{mapper, start, [client_proxy_mapper]},

permanent,

2000,

worker,

[mapper]

},

%% Client instance supervisor

{janus_transport_sup,

{supervisor, start_link, [{local, janus_transport_sup},

?MODULE, [Module]]},

permanent,

infinity,

supervisor,

[]

}

]

}

};

[/erlang]

从上面的代码可以看到这个supervisor一共会监控4个子进程,其中3个是worker,1个是supervisor。

对应的三个worker的名字分别是:

janus_sup(janus_acceptor:start_link())

janus_topman_sup(topman:start())

janus_proxy_mapper_sup(mapper:start(client_proxy_mapper))

而唯一的supervisor是janus_transport_sup(supervisor:start_link(transport))。

后面的括号注明了子进程的启动模块和回调函数。

从上面代码的注释可以看到每个子进程都是干嘛的,我们一个个来分析,首先来看第一个janus_sup进程,这个进程调用janus_acceptor模块的start_link启动的,所以我们来看janus_acceptor这个模块。

[erlang]

start_link(Parent, Port, Module)

when is_pid(Parent),

is_integer(Port),

is_atom(Module) ->

Args = [Parent, Port, Module],

proc_lib:start_link(?MODULE, acceptor_init, Args).

[/erlang]

这里可以看到代码比较简单,就是调用start_link启动一个子进程,子进程的模块就是当前模块,然后回调函数是acceptor_init,参数是一个list,包含三个参数,分别是父进程id,端口号,以及module, 父进程id所指的就是的supervisor的进程id,而module是指transport模块(可以看前面janus_app模块)。

这里要注意在调用proc_lib:start_link之前,一直是处于supervisor进程中的,当start_link之后,才是启动了子进程.这里使用了proc_lib:start_link,这个函数是同步的启动一个子进程,它会一直等待,直到子进程调用init_ack,才会返回.

因此接下来我们来看acceptor_init这个函数:

[erlang]

acceptor_init(Parent, Port, Module) ->

State = #state{

parent = Parent,

port = Port,

module = Module

},

error_logger:info_msg(“Listening on port pn”, [Port]),

case (catch do_init(State)) of

{ok, ListenSocket} ->

proc_lib:init_ack(State#state.parent, {ok, self()}),

acceptor_loop(State#state{listener = ListenSocket});

Error ->

proc_lib:init_ack(Parent, Error),

error

end.

[/erlang]

这个函数可以看到就是通过调用do_init来得到监听的listen socket,然后根据返回值来做一些操作,这里可以看到不论失败,成功都会调用init_ack来返回值给父进程,当成功之后,就会调用acceptor_loop来进入后续处理.

在看acceptor_loop之前,线来看do_init方法:

[erlang]

do_init(State) ->

Opts = [binary,

{packet, 0},

{reuseaddr, true},

{backlog, 1024},

{active, false}],

case gen_tcp:listen(State#state.port, Opts) of

{ok, ListenSocket} ->

{ok, ListenSocket};

{error, Reason} ->

throw({error, {listen, Reason}})

end.

[/erlang]

这里调用gen_tcp的listen方法,我们着重来看传入listen的opts,这里可以看到active被设置为false,也就是每次必须主动地调用recv来读取数据。

然后来看acceptor_loop 函数,也就是server子进程的主循环函数,这个函数主要就是通过accept来接收客户端的连接,然后交给后续模块处理.

[erlang]

acceptor_loop(State) ->

case (catch gen_tcp:accept(State#state.listener, 50000)) of

{ok, Socket} ->

handle_connection(State, Socket),

?MODULE:acceptor_loop(State);

{error, Reason} ->

handle_error(Reason),

?MODULE:acceptor_loop(State);

{‘EXIT’, Reason} ->

handle_error({‘EXIT’, Reason}),

?MODULE:acceptor_loop(State)

end.

[/erlang]

这里先暂停一下,我们先来看最后一个被supervisor监控的子进程,也就是一个子supervisor,janus_transport_sup。来看它的child spec:

[erlang]

{janus_transport_sup,

{supervisor, start_link, [{local, janus_transport_sup},

?MODULE, [Module]]},

permanent,

infinity,

supervisor,

[]

}

[/erlang]

可以看到他会继续创建一个新的supervisor,然后也是当前模块(janus_app),只不过参数是一个参数,因此我们来看另外的一个init函数:

[erlang]

init([Module]) ->

{ok,

{_SupFlags = {simple_one_for_one, ?MAX_RESTART, ?MAX_TIME},

[

%% TCP Client

{undefined,

{Module, start_link, []},

temporary,

2000,

worker,

[]

}

]

}

}.

[/erlang]

可以看到这个child spec,重启策略是simple_one_one,也就是需要手动重启,并且它将会创建的子进程是Module(transport模块)的start_link函数来启动.

接下来就来看transport的启动函数以及init函数,这个模块是一个gen_server.

[erlang]

-behavior(gen_server).

start_link(Port)

when is_integer(Port) ->

gen_server:start_link(?MODULE, [Port], []).

init([Port]) ->

process_flag(trap_exit, true),

{ok, #state{port = Port, transport = janus_flash }}.

[/erlang]

这里需要注意的是process_flag(trap_exit, true),这个其实也就是设置表示父进程将会接收子进程的crash信息。还有一个就是state,这里state的transport设置为了janus_flash模块.

ok,然后我们再回到janus_acceptor模块,接下来来看假设有一个连接过来之后的情况。这里跳过错误处理,就来看看正确的处理流程。

[erlang]

handle_connection(State, Socket),

?MODULE:acceptor_loop(State);

[/erlang]

当正确接到新的连接之后,会进入handle_connection的处理,然后调用acceptor_loop进入递归.因此我们就来看handle_connection

[erlang]

handle_connection(State, Socket) ->

{ok, Pid} = janus_app:start_transport(State#state.port),

ok = gen_tcp:controlling_process(Socket, Pid),

%% Instruct the new handler to own the socket.

(State#state.module):set_socket(Pid, Socket).

[/erlang]

这里做了3个操作,首先调用janus_app:start_transport来启动一个新的子进程,而这个子进程是属于那个supervisor呢,来看代码:

[erlang]

start_transport(Port) ->

supervisor:start_child(janus_transport_sup, [Port]).

[/erlang]

可以看到它启动了janus_transport_sup这个supervisor的子进程,而我们还记得前面分析的,这个supervisor的子进程的启动回调就是transport模块的start_link函数。这里要注意start_child返回的是子进程的pid.

[erlang]

start_link(Port)

when is_integer(Port) ->

gen_server:start_link(?MODULE, [Port], []).

[/erlang]

然后接下来的两个操作,就是将当前进程接受到的socket传递给新建的子进程,然后调用transport的set_socket方法。然后我们来看transport模块的set_socket方法.

[erlang]

set_socket(Ref, Sock) ->

gen_server:cast(Ref, {set_socket, Sock}).

[/erlang]

可以看到就是给新建的子进程发送一个set_socket的方法.这里要注意就是会设置socket的属性,也就是设置active为once。

[erlang]

handle_cast({set_socket, Socket}, State) ->

inet:setopts(Socket, [{active, once},

{packet, 0},

binary]),

{ok, Keep, Ref} = (State#state.transport):start(Socket),

keep_alive_or_close(Keep, State#state{socket = Socket, state = Ref});

[/erlang]

这里可以看到调用了state的transport的start方法,那么这个transport是那个模块呢,上面的分析中在当前transport的init方法中返回e设置的就是janus_flash模块,所以这里调用的就是janus_flash:start方法.

[erlang]

start(Socket) ->

Send = fun(Bin) -> gen_tcp:send(Socket, [Bin, 1]) end,

{ok, Proxy, Token} = client_proxy:start(Send),

State = #state{

socket = Socket,

proxy = Proxy,

token = Token

},

JSON = {struct,

[{<<”timestamp”>>, tuple_to_list(now())},

{<<”token”>>, Token}

]},

send(mochijson2:encode(JSON), State).

[/erlang]

这里可以看到先是创建了一个send方法,然后调用client_proxy start,这里client_proxy其实是一个gen_server,因此我们来看这个模块的start方法以及 init方法.

[erlang]

start(Send) ->

Token = common:random_token(),

{ok, Pid} = gen_server:start_link(?MODULE, [Token, self(), Send], []),

{ok, Pid, Token}.

init([Token, Parent, Send]) ->

process_flag(trap_exit, true),

ok = mapper:add(client_proxy_mapper, Token),

State = #state{

token = Token,

parent = Parent,

send = Send,

messages = []

},

{ok, State}.

[/erlang]

可以看到init方法里面调用了mapper模块的add方法,因此来看mapper:add方法

[erlang]

add(Ref, Key) ->

gen_server:call(Ref, {add, Key, self()}).

[/erlang]

可以看到也就是给client_proxy_mapper这个进程发送了一个同步的消息,而对应的client_proxy_mapper也就是一开始在janus_app模块中注册的进程,这个进程就是mapper模块启动的。因此来看mapper的对应同步消息接收。

[erlang]

handle_call({add, Key, Pid}, _, State) ->

case ets:lookup(State#state.key_pid, Key) of

[_] ->

ok;

_ ->

Ref = erlang:monitor(process, Pid),

ets:insert(State#state.key_pid, {Key, {Pid, Ref}}),

ets:insert(State#state.pid_key, {Pid, Key})

end,

{reply, ok, State};

[/erlang]

这里也就是将随机出来的token和进程通过ets关联。

前面这里对于数据的发送分析完了,剩下的就是连接的错误,断开处理以及数据的接收处理,线来看连接的接收处理,通过上面的分析,我们知道,accept到的socket是处于transport这个gen_server管理的,因此读取数据就在这个里面处理:

[erlang]

handle_info({tcp, Socket, <<”“, 0, Bin/binary>>}, State)

when Socket == State#state.socket ->

inet:setopts(Socket, [{active, once}]),

dispatch(Bin, janus_flash, State);

[/erlang]

这里主要还是调用dispatch来处理数据的读取,先是调用janus_flash的process方法,然后调用keep_alive_or_close来判断是否连接已经关闭.

[erlang]

dispatch(Data, Mod, State = #state{transport = Mod}) ->

{ok, Keep, TS} = Mod:process(Data, State#state.state),

keep_alive_or_close(Keep, State#state{state = TS}).

[/erlang]