首页 > erlang, 源码阅读 > hotwheels源码剖析

hotwheels源码剖析

原创文章,转载请注明: 转载自pagefault

本文链接地址: hotwheels源码剖析

在霸爷的推荐下,看了hotwheels的代码,接下来我就来分析下hotwheels的代码(主要是server端代码),hotwheels是干吗的呢,介绍在这里:

https://github.com/tolbrino/hotwheels

Janus is a messaging server optimized to unicast over TCP to thousands of clients subscribed to topics of interest.

The ultimate goal is to maintain a latency of less than 2 seconds for 20 thousand clients on Amazon EC2 (small instance).

首先来看janus.app:

{application, janus,
 [{description, "Janus"},
  {vsn, "0.0.1"},
  {id, "janus"},
  {modules, [barrier,
             bin,
             bot,
             client_proxy,
             common,
             flashbot,
             histo,
             janus,
             janus_acceptor,
             janus_admin,
             janus_app,
             janus_flash,
             launcher,
             mapper,
             pubsub,
             topman,
             t,
             transport,
             util
            ]},
  {registered, [janus_sup, 
                janus_topman_sup,
                janus_proxy_mapper_sup,
                janus_transport_sup,
                janus_listener]},
  {applications, [kernel, 
                  stdlib, 
                  mnesia,
                  inets
                 ]},
  {mod, {janus_app, []}},
  {env, []}
 ]
}.


具体每个域的意思这里就不介绍了,详细可以去看erlang的文档 http://www.erlang.org/doc/design_principles/applications.html

我们主要来看mod这个tuple,可以看到回调模块是janus_app,所以我们就从janus_app开始。

通过模块定义我们可以清楚的看到这个模块是一个application:

-module(janus_app).
-behaviour(application).

因此我们来看它的start函数:

-define(LISTEN_PORT, 8081).

start(_Type, _Args) ->
    Port = janus_admin:get_env(listen_port, ?LISTEN_PORT),
    supervisor:start_link({local, ?MODULE}, 
                          ?MODULE, 
                          [Port, transport]).

这里可以看到首先会从环境变量里面取得端口(命令行参数),而默认的port是8001,然后调用supervisor start_link函数,这个函数会启动创建并启动一个supervisor,这里可以看到回调模块是当前模块,因此我们接下来就来看当前模块的init函数.

init返回的child spec的格式我就不介绍了,可以去看erlang的手册

init([Port, Module]) ->
    {ok,
     {_SupFlags = {one_for_one, ?MAX_RESTART, ?MAX_TIME},
      [
       %% TCP server
       {janus_sup,
        {janus_acceptor, start_link, [self(), Port, Module]},
        permanent,
        2000,
        worker,
        [janus_acceptor]
       },
       %% Topic manager
       {janus_topman_sup,
        {topman, start, []},
        permanent,
        2000,
        worker,
        [topman]
       },
       %% Client proxy mapper
       {janus_proxy_mapper_sup,
        {mapper, start, [client_proxy_mapper]},
        permanent,
        2000,
        worker,
        [mapper]
       },
       %% Client instance supervisor
       {janus_transport_sup,
        {supervisor, start_link, [{local, janus_transport_sup}, 
                                  ?MODULE, [Module]]},
        permanent,
        infinity,
        supervisor,
        []
       }
      ]
     }
    };

从上面的代码可以看到这个supervisor一共会监控4个子进程,其中3个是worker,1个是supervisor。
对应的三个worker的名字分别是:

janus_sup(janus_acceptor:start_link())
janus_topman_sup(topman:start())
janus_proxy_mapper_sup(mapper:start(client_proxy_mapper))

而唯一的supervisor是janus_transport_sup(supervisor:start_link(transport))。
后面的括号注明了子进程的启动模块和回调函数。

从上面代码的注释可以看到每个子进程都是干嘛的,我们一个个来分析,首先来看第一个janus_sup进程,这个进程调用janus_acceptor模块的start_link启动的,所以我们来看janus_acceptor这个模块。

start_link(Parent, Port, Module) 
  when is_pid(Parent),
       is_integer(Port), 
       is_atom(Module) ->
    Args = [Parent, Port, Module],
    proc_lib:start_link(?MODULE, acceptor_init, Args).

这里可以看到代码比较简单,就是调用start_link启动一个子进程,子进程的模块就是当前模块,然后回调函数是acceptor_init,参数是一个list,包含三个参数,分别是父进程id,端口号,以及module, 父进程id所指的就是的supervisor的进程id,而module是指transport模块(可以看前面janus_app模块)。
这里要注意在调用proc_lib:start_link之前,一直是处于supervisor进程中的,当start_link之后,才是启动了子进程.这里使用了proc_lib:start_link,这个函数是同步的启动一个子进程,它会一直等待,直到子进程调用init_ack,才会返回.

因此接下来我们来看acceptor_init这个函数:

acceptor_init(Parent, Port, Module) ->
    State = #state{ 
      parent = Parent,
      port = Port,
      module = Module
     },
    error_logger:info_msg("Listening on port ~p~n", [Port]),
    case (catch do_init(State)) of
        {ok, ListenSocket} ->
            proc_lib:init_ack(State#state.parent, {ok, self()}),
            acceptor_loop(State#state{listener = ListenSocket});
        Error ->
            proc_lib:init_ack(Parent, Error),
            error
    end.

这个函数可以看到就是通过调用do_init来得到监听的listen socket,然后根据返回值来做一些操作,这里可以看到不论失败,成功都会调用init_ack来返回值给父进程,当成功之后,就会调用acceptor_loop来进入后续处理.

在看acceptor_loop之前,线来看do_init方法:

do_init(State) ->
    Opts = [binary, 
            {packet, 0}, 
            {reuseaddr, true},
            {backlog, 1024},
            {active, false}],
    case gen_tcp:listen(State#state.port, Opts) of
        {ok, ListenSocket} ->
            {ok, ListenSocket};
        {error, Reason} ->
            throw({error, {listen, Reason}})
    end.

这里调用gen_tcp的listen方法,我们着重来看传入listen的opts,这里可以看到active被设置为false,也就是每次必须主动地调用recv来读取数据。

然后来看acceptor_loop 函数,也就是server子进程的主循环函数,这个函数主要就是通过accept来接收客户端的连接,然后交给后续模块处理.

acceptor_loop(State) ->
    case (catch gen_tcp:accept(State#state.listener, 50000)) of
        {ok, Socket} ->
            handle_connection(State, Socket),
            ?MODULE:acceptor_loop(State);
        {error, Reason} ->
            handle_error(Reason),
            ?MODULE:acceptor_loop(State);
        {'EXIT', Reason} ->
            handle_error({'EXIT', Reason}),
            ?MODULE:acceptor_loop(State)
    end.

这里先暂停一下,我们先来看最后一个被supervisor监控的子进程,也就是一个子supervisor,janus_transport_sup。来看它的child spec:

       {janus_transport_sup,
        {supervisor, start_link, [{local, janus_transport_sup}, 
                                  ?MODULE, [Module]]},
        permanent,
        infinity,
        supervisor,
        []
       }

可以看到他会继续创建一个新的supervisor,然后也是当前模块(janus_app),只不过参数是一个参数,因此我们来看另外的一个init函数:

init([Module]) ->
    {ok,
     {_SupFlags = {simple_one_for_one, ?MAX_RESTART, ?MAX_TIME},
      [
       %% TCP Client
       {undefined,
        {Module, start_link, []},
        temporary,
        2000,
        worker,
        []
       }
      ]
     }
    }.

可以看到这个child spec,重启策略是simple_one_one,也就是需要手动重启,并且它将会创建的子进程是Module(transport模块)的start_link函数来启动.

接下来就来看transport的启动函数以及init函数,这个模块是一个gen_server.


-behavior(gen_server).

start_link(Port) 
  when is_integer(Port) ->
    gen_server:start_link(?MODULE, [Port], []).

init([Port]) ->
    process_flag(trap_exit, true),
    {ok, #state{port = Port, transport = janus_flash }}.

这里需要注意的是process_flag(trap_exit, true),这个其实也就是设置表示父进程将会接收子进程的crash信息。还有一个就是state,这里state的transport设置为了janus_flash模块.

ok,然后我们再回到janus_acceptor模块,接下来来看假设有一个连接过来之后的情况。这里跳过错误处理,就来看看正确的处理流程。

            handle_connection(State, Socket),
            ?MODULE:acceptor_loop(State);

当正确接到新的连接之后,会进入handle_connection的处理,然后调用acceptor_loop进入递归.因此我们就来看handle_connection

handle_connection(State, Socket) ->
    {ok, Pid} = janus_app:start_transport(State#state.port),
    ok = gen_tcp:controlling_process(Socket, Pid),
    %% Instruct the new handler to own the socket.
    (State#state.module):set_socket(Pid, Socket).

这里做了3个操作,首先调用janus_app:start_transport来启动一个新的子进程,而这个子进程是属于那个supervisor呢,来看代码:

start_transport(Port) ->
    supervisor:start_child(janus_transport_sup, [Port]).

可以看到它启动了janus_transport_sup这个supervisor的子进程,而我们还记得前面分析的,这个supervisor的子进程的启动回调就是transport模块的start_link函数。这里要注意start_child返回的是子进程的pid.

start_link(Port) 
  when is_integer(Port) ->
    gen_server:start_link(?MODULE, [Port], []).

然后接下来的两个操作,就是将当前进程接受到的socket传递给新建的子进程,然后调用transport的set_socket方法。然后我们来看transport模块的set_socket方法.

set_socket(Ref, Sock) ->
    gen_server:cast(Ref, {set_socket, Sock}).

可以看到就是给新建的子进程发送一个set_socket的方法.这里要注意就是会设置socket的属性,也就是设置active为once。

handle_cast({set_socket, Socket}, State) ->
    inet:setopts(Socket, [{active, once}, 
                          {packet, 0}, 
                          binary]),    
    {ok, Keep, Ref} = (State#state.transport):start(Socket),
    keep_alive_or_close(Keep, State#state{socket = Socket, state = Ref});

这里可以看到调用了state的transport的start方法,那么这个transport是那个模块呢,上面的分析中在当前transport的init方法中返回e设置的就是janus_flash模块,所以这里调用的就是janus_flash:start方法.

start(Socket) ->
    Send = fun(Bin) -> gen_tcp:send(Socket, [Bin, 1]) end,
    {ok, Proxy, Token} = client_proxy:start(Send),
    State = #state{
      socket = Socket, 
      proxy = Proxy, 
      token = Token
     },
    JSON = {struct,
            [{<<"timestamp">>, tuple_to_list(now())},
             {<<"token">>, Token}
            ]},
    send(mochijson2:encode(JSON), State).

这里可以看到先是创建了一个send方法,然后调用client_proxy start,这里client_proxy其实是一个gen_server,因此我们来看这个模块的start方法以及 init方法.

start(Send) ->
    Token = common:random_token(),
    {ok, Pid} = gen_server:start_link(?MODULE, [Token, self(), Send], []),
    {ok, Pid, Token}.

init([Token, Parent, Send]) ->
    process_flag(trap_exit, true),
    ok = mapper:add(client_proxy_mapper, Token),
    State = #state{
      token = Token,
      parent = Parent,
      send = Send,
      messages = []
     },
   {ok, State}.

可以看到init方法里面调用了mapper模块的add方法,因此来看mapper:add方法

add(Ref, Key) ->
    gen_server:call(Ref, {add, Key, self()}).

可以看到也就是给client_proxy_mapper这个进程发送了一个同步的消息,而对应的client_proxy_mapper也就是一开始在janus_app模块中注册的进程,这个进程就是mapper模块启动的。因此来看mapper的对应同步消息接收。

handle_call({add, Key, Pid}, _, State) ->
    case ets:lookup(State#state.key_pid, Key) of
        [_] ->
            ok;
        _ ->
            Ref = erlang:monitor(process, Pid),
            ets:insert(State#state.key_pid, {Key, {Pid, Ref}}),
            ets:insert(State#state.pid_key, {Pid, Key})
    end,
    {reply, ok, State};

这里也就是将随机出来的token和进程通过ets关联。

前面这里对于数据的发送分析完了,剩下的就是连接的错误,断开处理以及数据的接收处理,线来看连接的接收处理,通过上面的分析,我们知道,accept到的socket是处于transport这个gen_server管理的,因此读取数据就在这个里面处理:

handle_info({tcp, Socket, <<"<regular-socket/>", 0, Bin/binary>>}, State)
  when Socket == State#state.socket ->
    inet:setopts(Socket, [{active, once}]),
    dispatch(Bin, janus_flash, State);

这里主要还是调用dispatch来处理数据的读取,先是调用janus_flash的process方法,然后调用keep_alive_or_close来判断是否连接已经关闭.

dispatch(Data, Mod, State = #state{transport = Mod}) ->
    {ok, Keep, TS} = Mod:process(Data, State#state.state),
    keep_alive_or_close(Keep, State#state{state = TS}).
Share
分类: erlang, 源码阅读 标签: ,
  1. 2012年4月18日09:07 | #1

    个人觉得hotwheel 中代码最经典的地方就是那个客户端连接进程transport 的启动、监控、socket绑定;以及gen_tcp的once方式下的数据接收。这个模型可以用在很多地方。

    [回复]

  1. 本文目前尚无任何 trackbacks 和 pingbacks.