TinyMQ学習(2)ソースコード


前のセクション(
http://diaocow.iteye.com/blog/1734253)TinyMQについて概説したが,この節では,著者らがメッセージのパブリケーション/サブスクリプションをどのように実現するかに重点を置いている.
ソースコードを見る前にmoduleと関数の役割を理解する必要があります.
dict:new()
     Key-Value Dictionary(         map)

dict:find(Key, Dict) -> {ok, Value} | error
 Dict1  Key   Value,      {ok, Value}     error

dict:store(Key, Value, Dict1) -> Dict2
 Dict1     Key-Value,      Dict2( Key    ,         Value)

-----------------------------------------------------------------------------
lists:foldl(Fun, Acc0, List) -> Acc1
  Fun     ,      :      List    (  ->    ),      Fun          (         Acc0),        :
> lists:foldl(fun(X, Sum) -> X + Sum end, 0, [1,2,3,4,5]).
15
    :
  ,Fun      ,X     List      1,Sum 0,
Fun      ,X     List      2,Sum Fun       1
    ,  List        ,foldl             。

lists:foldr(Fun, Acc0, List) -> Acc1
 lists:foldl,       List       -> 

-----------------------------------------------------------------------------
gb_trees module
gb_trees   Prof. Arne Anderssons General Balanced Trees,      AVL        .    gb_trees    tiny_pq.erl(     )         ;                   ,                   priority

上の関数の説明を見て、tinymqのソースコードディレクトリを見てみましょう.主に以下のファイルがあります.
tinymq_app.erl
tinymq_sup.erl
tinymq_channel_sup.erl
tinymq_controller.erl
tinymq_channel_controller.erl
tinymq.erl
tiny_pq.Erl(これは私が後で自分で入れたものです)
tinymq_app.Erlはアプリケーション全体を起動するためのApplication Callback Moduleです.
start(_StartType, _StartArgs) ->
    tinymq_sup:start_link().   

tinymq_sup:start_link()はまた何をしましたか.
%%   supervisor(tinymq_supervisor        (            ))
start_link() ->
    supervisor:start_link(?MODULE, []).

%% tinymq_supervisor    child process:
%%    worker proces       mq     
%%     supervisor process       Channel  
init([]) ->

	%% child process  (          erlang  )
    MqWorker = {mq_controller, {tinymq_controller, start_link, []},
           permanent, 2000, worker, [tinymq_controller]},

    ChannelSup = {tinymq_channel_sup, {tinymq_channel_sup, start_link, []},
                  permanent, 2000, supervisor, [tinymq_channel_sup]},

    Children = [MqWorker, ChannelSup],

    %%      :one_for_one,     10      10 
    RestartStrategy = {one_for_one, 10, 10},
    {ok, {RestartStrategy, Children}}.

これでトップレベルのsupervisorが作成され、モニタリングツリーが初期化されます(ワークプロセスとsupervisorプロセスが2つ作成されています).次に、この2つのchildプロセスを見てみましょう.
1.worker process

tinymq_controller.erl  

-record(state, {dict, max_age}).

%%   gen_server  ,       tinymq   
start_link() ->
    gen_server:start_link({local, tinymq}, ?MODULE, [], []).

init([]) ->
    %%       :      (Application Resource File    60s)
    {ok, MaxAgeSeconds} = application:get_env(max_age), 
    %%    tinymq    (         ,    Channel ChannelPid       ,            )
    {ok, #state{dict = dict:new(), max_age = MaxAgeSeconds}}.
 
もう一つのchild processを見てみましょう
2.supervisor process

tinymq_channel_sup.erl  

%%     supervisor(tinymq_channel_supervisor)
start_link() ->
    supervisor:start_link({local, tinymq_channel_sup}, ?MODULE, []).

%%  supervior   child process       
init(_StartArgs) ->
	%% child process   
    {ok, {{simple_one_for_one, 0, 10},
          [
           {mq_channel_controller, {tinymq_channel_controller, start_link, []},
            temporary, 2000, worker, [tinymq_channel_controller]}
          ]}}.

モニタツリーが初期化され、tinymqサービスも開始されました.tinymqがクライアントのサブスクリプションリクエスト(subscribe request)をどのように処理するかを見てみましょう.
tinymq_controller  :

%%              
handle_call({subscribe, Channel, Timestamp, Subscriber}, From, State) ->    
    %%    Channel      pid(      Channel         Process)
    {ChannelPid, NewState} = find_or_create_channel(Channel, State),
    %%              Channle    
    gen_server:cast(ChannelPid, {From, subscribe, Timestamp, Subscriber}),
    {noreply, NewState};

find_or_create_channel(Channel, #state{dict = Chan2Pid, max_age = MaxAge} = State) ->
    %%    Channel    Channel      ,        Pid
    case dict:find(Channel, Chan2Pid) of
        {ok, Pid} ->
            {Pid, State};
        _ ->
            %%         Channel       tinymq_channel_sup    
            {ok, ChannelPid} = supervisor:start_child(tinymq_channel_sup, [MaxAge, tinymq_channel_sup, Channel]),
            %%     Channel ChannelPid    (add) 
            {ChannelPid, State#state{
                    dict = dict:store(Channel, ChannelPid, Chan2Pid)
                }} 
    end.

次に、チャネルサービスがこのサブスクリプションリクエストをどのように処理しているかを見てみましょう.
tinymq_channel_controller  

%%     Channel  
start_link(MaxAge, ChannelSup, Channel) ->
    gen_server:start_link(?MODULE, [MaxAge, ChannelSup, Channel], []).

%%         Channel     ,         ,             :
%% ---Channel (       topic )
%% ---    (         ,                 )
%% ---     (                       )
%% ---      (      ,          )
%% ---            (                1 )
%% ---         
init([MaxAge, ChannelSup, Channel]) ->
    {ok, #state{
            max_age = MaxAge,
            supervisor = ChannelSup,       
            channel = Channel,             
            messages = gb_trees:empty(),   
            last_pull = now_to_micro_seconds(erlang:now()),    
            last_purge = now_to_micro_seconds(erlang:now()) },  
     MaxAge * 1000}.

チャネルサービスが購読要求を受信すると、

tinymq_channel_controller  

%%        
handle_cast({From, subscribe, 'now', Subscriber}, State) ->
    %%        (                  ,      )
    NewSubscribers = add_subscriber(Subscriber, State#state.subscribers), 
    %%           (               Channle(topic)     )
    gen_server:reply(From, {ok, now_to_micro_seconds(erlang:now())}),     
    %%             ,    Channel state
    %%       :Purging old messages occurs after any channel activity (but no more than once per second)
    {noreply, purge_old_messages(State#state{ subscribers = NewSubscribers })};  

%%      
add_subscriber(NewSubscriber, Subscribers) ->
        case lists:keymember(NewSubscriber, 2, Subscribers) of
        %%           ,      
        true -> Subscribers; 
        %%            {monitorSubscriberPid, SubscriberPid}
        false -> [{erlang:monitor(process, NewSubscriber), NewSubscriber} | Subscribers]  
    end.

%%               (       1 )
purge_old_messages(State) ->
    Now = now_to_micro_seconds(erlang:now()),	%% now_to_micro_seconds,               
    LastPurge = State#state.last_purge,     	%%            
    Duration = seconds_to_micro_seconds(1), 	%%     (   1 )
    if
        Now - LastPurge > Duration ->    
            State#state{ 
                %% @tiny_pq:prune_old(Tree, Priority) -> Tree1  Remove nodes with priority less than or equal to `Priority'
                %%    Tree      messages, Priority  Time(    -      ,      Priority    ,       )
                messages = tiny_pq:prune_old(State#state.messages, 
                    Now - seconds_to_micro_seconds(State#state.max_age)),
                %%              
                last_purge = Now };
        true ->
            State
    end.

これでtinymqはクライアントのサブスクリプション要求を処理し,前節に示すサブスクリプション・エンド・フローチャートをレビューし,理解を深めることができる.
次にtinymqがクライアントのメッセージリクエストにどのように応答するかを見てみましょう.
クライアントのすべてのリクエストはtinymq serverに送信され、チャネルサービス処理ごとに送信されることを覚えておいてください.
tinymq_controller  :

%%              
handle_call({push, Channel, Message}, From, State) ->
    %%    Channel      pid,     
    {ChannelPid, NewState} = find_or_create_channel(Channel, State),
    %%           Channel process  
    gen_server:cast(ChannelPid, {From, push, Message}),
    {noreply, NewState};

チャネルサービスがメッセージ送信要求を受信すると、

tinymq_channel_controller  
(         ,                  ,       ,         ):

%%         
handle_cast({From, push, Message}, State) ->
    Now = now_to_micro_seconds(erlang:now()),  

    %%             
    LastPull = lists:foldr(fun({Ref, Sub}, _) -> 
                Sub ! {self(), Now, [Message]},   
                %% erlang:demonitor(Ref),   monitor 
                Now    %%         :         
        end, State#state.last_pull, State#state.subscribers), %% State#state.subscribers   List,    Channel      

    %%      (publisher)      
    gen_server:reply(From, {ok, Now}),  
    %%             
    State2 = purge_old_messages(State),
    %%            (priorit,message, tree)                                    
    NewMessages = tiny_pq:insert_value(Now, Message, State2#state.messages), 
    %%       :Subscribers are removed from the channel as soon as the first message is delivered   :subscribers = []
    {noreply, State2#state{messages = NewMessages, last_pull = LastPull}, State#state.max_age * 1000};

メッセージコードを送信するのも容易であることがわかりました.同様に、前節にリストされた送信図の論理図を振り返って理解を深めることができます.
これでtinymqのパブリケーション/サブスクリプションの主な実装コードを確認し、テストコードを書きます.
購読者:
-module(subscriber).
-export([start/1]).

start(Num) ->
	create_subscriber(Num).

create_subscriber(0) -> ok;
create_subscriber(Num) ->
	spawn(fun subscribe/0),
	create_subscriber(Num - 1).

subscribe() ->
	tinymq:subscribe("hello_channel", now, self()),
	loop().

loop() ->
	receive
   		{_From, _Timestamp, Messages} ->
        	io:format("Pid: ~p received messages: ~p~n", [self(), Messages])
	end,
	loop().

start関数のNumは、いくつかのサブスクライバを作成することを示しています.コードは簡単ですが、説明は省略します.
パブリッシャ
-module(publisher).
-export([start/0]).

start() ->
	spawn(fun publish/0).

publish() ->
	Messages = {hello_erlang, erlang:now()},
    io:format("Pid: ~p send messages: ~p~n", [self(), Messages]),
	tinymq:push("hello_channel", Messages),

	receive
	after 5000 ->
		true
	end,

	publish().

パブリッシャーは5秒ごとにチャンネルにメッセージを送信する
最後に、実行効果図を見てみましょう
TinyMQ学习(2) 源码_第1张图片
TinyMQのソースコードについてはここまでですが、詳細は私が掘り起こす価値があります(例えばmonitorSubscriber、優先順位キュー実装アルゴリズム)、サービス状態の持続化、unsubscribeリクエスト処理の追加など)、独自のerlang基盤の再補完が必要です.公式ドキュメントを見てみましょう.
ps:iteyeのerlangコードのハイライトは本当に...、お疲れ様でした....