TinyMQ学習(2)ソースコード
前のセクション(
http://diaocow.iteye.com/blog/1734253)TinyMQについて概説したが,この節では,著者らがメッセージのパブリケーション/サブスクリプションをどのように実現するかに重点を置いている.
ソースコードを見る前にmoduleと関数の役割を理解する必要があります.
上の関数の説明を見て、tinymqのソースコードディレクトリを見てみましょう.主に以下のファイルがあります.
tinymq_app.erl
tinymq_sup.erl
tinymq_channel_sup.erl
tinymq_controller.erl
tinymq_channel_controller.erl
tinymq.erl
tiny_pq.Erl(これは私が後で自分で入れたものです)
tinymq_app.Erlはアプリケーション全体を起動するためのApplication Callback Moduleです.
tinymq_sup:start_link()はまた何をしましたか.
これでトップレベルのsupervisorが作成され、モニタリングツリーが初期化されます(ワークプロセスとsupervisorプロセスが2つ作成されています).次に、この2つのchildプロセスを見てみましょう.
1.worker process
もう一つのchild processを見てみましょう
2.supervisor process
モニタツリーが初期化され、tinymqサービスも開始されました.tinymqがクライアントのサブスクリプションリクエスト(subscribe request)をどのように処理するかを見てみましょう.
次に、チャネルサービスがこのサブスクリプションリクエストをどのように処理しているかを見てみましょう.
チャネルサービスが購読要求を受信すると、
これでtinymqはクライアントのサブスクリプション要求を処理し,前節に示すサブスクリプション・エンド・フローチャートをレビューし,理解を深めることができる.
次にtinymqがクライアントのメッセージリクエストにどのように応答するかを見てみましょう.
クライアントのすべてのリクエストはtinymq serverに送信され、チャネルサービス処理ごとに送信されることを覚えておいてください.
チャネルサービスがメッセージ送信要求を受信すると、
メッセージコードを送信するのも容易であることがわかりました.同様に、前節にリストされた送信図の論理図を振り返って理解を深めることができます.
これでtinymqのパブリケーション/サブスクリプションの主な実装コードを確認し、テストコードを書きます.
購読者:
start関数のNumは、いくつかのサブスクライバを作成することを示しています.コードは簡単ですが、説明は省略します.
パブリッシャ
パブリッシャーは5秒ごとにチャンネルにメッセージを送信する
最後に、実行効果図を見てみましょう
TinyMQのソースコードについてはここまでですが、詳細は私が掘り起こす価値があります(例えばmonitorSubscriber、優先順位キュー実装アルゴリズム)、サービス状態の持続化、unsubscribeリクエスト処理の追加など)、独自のerlang基盤の再補完が必要です.公式ドキュメントを見てみましょう.
ps:iteyeのerlangコードのハイライトは本当に...、お疲れ様でした....
http://diaocow.iteye.com/blog/1734253)TinyMQについて概説したが,この節では,著者らがメッセージのパブリケーション/サブスクリプションをどのように実現するかに重点を置いている.
ソースコードを見る前にmoduleと関数の役割を理解する必要があります.
dict:new()
Key-Value Dictionary( map)
dict:find(Key, Dict) -> {ok, Value} | error
Dict1 Key Value, {ok, Value} error
dict:store(Key, Value, Dict1) -> Dict2
Dict1 Key-Value, Dict2( Key , Value)
-----------------------------------------------------------------------------
lists:foldl(Fun, Acc0, List) -> Acc1
Fun , : List ( -> ), Fun ( Acc0), :
> lists:foldl(fun(X, Sum) -> X + Sum end, 0, [1,2,3,4,5]).
15
:
,Fun ,X List 1,Sum 0,
Fun ,X List 2,Sum Fun 1
, List ,foldl 。
lists:foldr(Fun, Acc0, List) -> Acc1
lists:foldl, List ->
-----------------------------------------------------------------------------
gb_trees module
gb_trees Prof. Arne Anderssons General Balanced Trees, AVL . gb_trees tiny_pq.erl( ) ; , priority
上の関数の説明を見て、tinymqのソースコードディレクトリを見てみましょう.主に以下のファイルがあります.
tinymq_app.erl
tinymq_sup.erl
tinymq_channel_sup.erl
tinymq_controller.erl
tinymq_channel_controller.erl
tinymq.erl
tiny_pq.Erl(これは私が後で自分で入れたものです)
tinymq_app.Erlはアプリケーション全体を起動するためのApplication Callback Moduleです.
start(_StartType, _StartArgs) ->
tinymq_sup:start_link().
tinymq_sup:start_link()はまた何をしましたか.
%% supervisor(tinymq_supervisor ( ))
start_link() ->
supervisor:start_link(?MODULE, []).
%% tinymq_supervisor child process:
%% worker proces mq
%% supervisor process Channel
init([]) ->
%% child process ( erlang )
MqWorker = {mq_controller, {tinymq_controller, start_link, []},
permanent, 2000, worker, [tinymq_controller]},
ChannelSup = {tinymq_channel_sup, {tinymq_channel_sup, start_link, []},
permanent, 2000, supervisor, [tinymq_channel_sup]},
Children = [MqWorker, ChannelSup],
%% :one_for_one, 10 10
RestartStrategy = {one_for_one, 10, 10},
{ok, {RestartStrategy, Children}}.
これでトップレベルのsupervisorが作成され、モニタリングツリーが初期化されます(ワークプロセスとsupervisorプロセスが2つ作成されています).次に、この2つのchildプロセスを見てみましょう.
1.worker process
tinymq_controller.erl
-record(state, {dict, max_age}).
%% gen_server , tinymq
start_link() ->
gen_server:start_link({local, tinymq}, ?MODULE, [], []).
init([]) ->
%% : (Application Resource File 60s)
{ok, MaxAgeSeconds} = application:get_env(max_age),
%% tinymq ( , Channel ChannelPid , )
{ok, #state{dict = dict:new(), max_age = MaxAgeSeconds}}.
もう一つのchild processを見てみましょう
2.supervisor process
tinymq_channel_sup.erl
%% supervisor(tinymq_channel_supervisor)
start_link() ->
supervisor:start_link({local, tinymq_channel_sup}, ?MODULE, []).
%% supervior child process
init(_StartArgs) ->
%% child process
{ok, {{simple_one_for_one, 0, 10},
[
{mq_channel_controller, {tinymq_channel_controller, start_link, []},
temporary, 2000, worker, [tinymq_channel_controller]}
]}}.
モニタツリーが初期化され、tinymqサービスも開始されました.tinymqがクライアントのサブスクリプションリクエスト(subscribe request)をどのように処理するかを見てみましょう.
tinymq_controller :
%%
handle_call({subscribe, Channel, Timestamp, Subscriber}, From, State) ->
%% Channel pid( Channel Process)
{ChannelPid, NewState} = find_or_create_channel(Channel, State),
%% Channle
gen_server:cast(ChannelPid, {From, subscribe, Timestamp, Subscriber}),
{noreply, NewState};
find_or_create_channel(Channel, #state{dict = Chan2Pid, max_age = MaxAge} = State) ->
%% Channel Channel , Pid
case dict:find(Channel, Chan2Pid) of
{ok, Pid} ->
{Pid, State};
_ ->
%% Channel tinymq_channel_sup
{ok, ChannelPid} = supervisor:start_child(tinymq_channel_sup, [MaxAge, tinymq_channel_sup, Channel]),
%% Channel ChannelPid (add)
{ChannelPid, State#state{
dict = dict:store(Channel, ChannelPid, Chan2Pid)
}}
end.
次に、チャネルサービスがこのサブスクリプションリクエストをどのように処理しているかを見てみましょう.
tinymq_channel_controller
%% Channel
start_link(MaxAge, ChannelSup, Channel) ->
gen_server:start_link(?MODULE, [MaxAge, ChannelSup, Channel], []).
%% Channel , , :
%% ---Channel ( topic )
%% --- ( , )
%% --- ( )
%% --- ( , )
%% --- ( 1 )
%% ---
init([MaxAge, ChannelSup, Channel]) ->
{ok, #state{
max_age = MaxAge,
supervisor = ChannelSup,
channel = Channel,
messages = gb_trees:empty(),
last_pull = now_to_micro_seconds(erlang:now()),
last_purge = now_to_micro_seconds(erlang:now()) },
MaxAge * 1000}.
チャネルサービスが購読要求を受信すると、
tinymq_channel_controller
%%
handle_cast({From, subscribe, 'now', Subscriber}, State) ->
%% ( , )
NewSubscribers = add_subscriber(Subscriber, State#state.subscribers),
%% ( Channle(topic) )
gen_server:reply(From, {ok, now_to_micro_seconds(erlang:now())}),
%% , Channel state
%% :Purging old messages occurs after any channel activity (but no more than once per second)
{noreply, purge_old_messages(State#state{ subscribers = NewSubscribers })};
%%
add_subscriber(NewSubscriber, Subscribers) ->
case lists:keymember(NewSubscriber, 2, Subscribers) of
%% ,
true -> Subscribers;
%% {monitorSubscriberPid, SubscriberPid}
false -> [{erlang:monitor(process, NewSubscriber), NewSubscriber} | Subscribers]
end.
%% ( 1 )
purge_old_messages(State) ->
Now = now_to_micro_seconds(erlang:now()), %% now_to_micro_seconds,
LastPurge = State#state.last_purge, %%
Duration = seconds_to_micro_seconds(1), %% ( 1 )
if
Now - LastPurge > Duration ->
State#state{
%% @tiny_pq:prune_old(Tree, Priority) -> Tree1 Remove nodes with priority less than or equal to `Priority'
%% Tree messages, Priority Time( - , Priority , )
messages = tiny_pq:prune_old(State#state.messages,
Now - seconds_to_micro_seconds(State#state.max_age)),
%%
last_purge = Now };
true ->
State
end.
これでtinymqはクライアントのサブスクリプション要求を処理し,前節に示すサブスクリプション・エンド・フローチャートをレビューし,理解を深めることができる.
次にtinymqがクライアントのメッセージリクエストにどのように応答するかを見てみましょう.
クライアントのすべてのリクエストはtinymq serverに送信され、チャネルサービス処理ごとに送信されることを覚えておいてください.
tinymq_controller :
%%
handle_call({push, Channel, Message}, From, State) ->
%% Channel pid,
{ChannelPid, NewState} = find_or_create_channel(Channel, State),
%% Channel process
gen_server:cast(ChannelPid, {From, push, Message}),
{noreply, NewState};
チャネルサービスがメッセージ送信要求を受信すると、
tinymq_channel_controller
( , , , ):
%%
handle_cast({From, push, Message}, State) ->
Now = now_to_micro_seconds(erlang:now()),
%%
LastPull = lists:foldr(fun({Ref, Sub}, _) ->
Sub ! {self(), Now, [Message]},
%% erlang:demonitor(Ref), monitor
Now %% :
end, State#state.last_pull, State#state.subscribers), %% State#state.subscribers List, Channel
%% (publisher)
gen_server:reply(From, {ok, Now}),
%%
State2 = purge_old_messages(State),
%% (priorit,message, tree)
NewMessages = tiny_pq:insert_value(Now, Message, State2#state.messages),
%% :Subscribers are removed from the channel as soon as the first message is delivered :subscribers = []
{noreply, State2#state{messages = NewMessages, last_pull = LastPull}, State#state.max_age * 1000};
メッセージコードを送信するのも容易であることがわかりました.同様に、前節にリストされた送信図の論理図を振り返って理解を深めることができます.
これでtinymqのパブリケーション/サブスクリプションの主な実装コードを確認し、テストコードを書きます.
購読者:
-module(subscriber).
-export([start/1]).
start(Num) ->
create_subscriber(Num).
create_subscriber(0) -> ok;
create_subscriber(Num) ->
spawn(fun subscribe/0),
create_subscriber(Num - 1).
subscribe() ->
tinymq:subscribe("hello_channel", now, self()),
loop().
loop() ->
receive
{_From, _Timestamp, Messages} ->
io:format("Pid: ~p received messages: ~p~n", [self(), Messages])
end,
loop().
start関数のNumは、いくつかのサブスクライバを作成することを示しています.コードは簡単ですが、説明は省略します.
パブリッシャ
-module(publisher).
-export([start/0]).
start() ->
spawn(fun publish/0).
publish() ->
Messages = {hello_erlang, erlang:now()},
io:format("Pid: ~p send messages: ~p~n", [self(), Messages]),
tinymq:push("hello_channel", Messages),
receive
after 5000 ->
true
end,
publish().
パブリッシャーは5秒ごとにチャンネルにメッセージを送信する
最後に、実行効果図を見てみましょう
TinyMQのソースコードについてはここまでですが、詳細は私が掘り起こす価値があります(例えばmonitorSubscriber、優先順位キュー実装アルゴリズム)、サービス状態の持続化、unsubscribeリクエスト処理の追加など)、独自のerlang基盤の再補完が必要です.公式ドキュメントを見てみましょう.
ps:iteyeのerlangコードのハイライトは本当に...、お疲れ様でした....