Netmq VS redisサブスクリプションパブリケーションパフォーマンススタディ
8271 ワード
簡単にご紹介します.
.NETの开発、手游びの开発に従事して8年余りあって、本当に爱して木があります
コードはここへhttps://github.com/GuojieChen/netmq-patterns-sample
現在、プロジェクトはビジネスに必要なテクノロジーを決定する過程で、メッセージキューの使用方法とパフォーマンスを選択する方法に遭遇しました.
現在、ネット上でいくつかの文章を見つけました.https://www.cnblogs.com/pasoraku/p/4673039.html海外:https://gist.github.com/hmartiro/85b89858d2c12ae1a0f9
外国のこの文章では
ZeroMQ, Pub/Sub: 481,000 msg/s, latency <1 ms Redis Pub/Sub (async via libevent): 59,000 msg/s, latency <1 ms
この結論(C+++に基づく)に対しては,これについては,しばらくシミュレーションを行っていない.
使用例の紹介
生産者:10個のスレッドで、データを絶えず生成し、ここでは時間の記時周期数DateTimeを用いる.Now.Ticksコンシューマ:データを受信し、消費時間を計算する10 sごとに毎秒の平均値を統計する
netmq
PubSub.Serverサブスクライバ、コンシューマ
PubSub.Proxyミドルエージェントは、正式な環境におけるProxyレイヤをシミュレートし、redisサーバと同等です.
PubSub.Client消費者、生産者
redisテストモデル
RedisPubSub.Server
RedisPubSub.Client
Netmq運転結果
redisテスト結果
結論はみんなでまとめて、しかも今は小量のデータのテスト以上に不適切なところがあるので、皆さんに指摘してもらいたいです.ありがとうございます.
.NETの开発、手游びの开発に従事して8年余りあって、本当に爱して木があります
コードはここへhttps://github.com/GuojieChen/netmq-patterns-sample
現在、プロジェクトはビジネスに必要なテクノロジーを決定する過程で、メッセージキューの使用方法とパフォーマンスを選択する方法に遭遇しました.
現在、ネット上でいくつかの文章を見つけました.https://www.cnblogs.com/pasoraku/p/4673039.html海外:https://gist.github.com/hmartiro/85b89858d2c12ae1a0f9
外国のこの文章では
ZeroMQ, Pub/Sub: 481,000 msg/s, latency <1 ms Redis Pub/Sub (async via libevent): 59,000 msg/s, latency <1 ms
この結論(C+++に基づく)に対しては,これについては,しばらくシミュレーションを行っていない.
使用例の紹介
生産者:10個のスレッドで、データを絶えず生成し、ここでは時間の記時周期数DateTimeを用いる.Now.Ticksコンシューマ:データを受信し、消費時間を計算する10 sごとに毎秒の平均値を統計する
netmq
PubSub.Serverサブスクライバ、コンシューマ
using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using NetMQ;
using NetMQ.Sockets;
namespace PubSub.Server
{
class Program
{
static void Main(string[] args)
{
SubscriberSocket socket = new SubscriberSocket(">tcp://localhost:1012");
socket.Subscribe("aaa");
socket.ReceiveReady += Socket_ReceiveReady;
NetMQ.NetMQPoller poller = new NetMQ.NetMQPoller();
poller.Add(socket);
poller.RunAsync();
Task.Factory.StartNew(() =>
{
while (true)
{
Thread.Sleep(1*1000);
var tmp = d;
d = new List();
if (tmp.Any())
Console.WriteLine($"{DateTime.Now}\t{tmp.Count / 1}/s\t{tmp.Average(x => x.TotalMilliseconds)}");
else
Console.WriteLine("-");
}
});
Console.Read();
}
private static List d = new List();
private static void Socket_ReceiveReady(object sender, NetMQ.NetMQSocketEventArgs e)
{
var topic = e.Socket.ReceiveFrameString();
var dt = e.Socket.ReceiveFrameString();
//Console.WriteLine(dt);
var t = DateTime.Now.Subtract(new DateTime(Convert.ToInt64(dt)));
d.Add(t);
}
}
}
PubSub.Proxyミドルエージェントは、正式な環境におけるProxyレイヤをシミュレートし、redisサーバと同等です.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using NetMQ;
using NetMQ.Sockets;
namespace PubSub.Proxy
{
class Program
{
static void Main(string[] args)
{
NetMQ.Proxy proxy = new NetMQ.Proxy(new XSubscriberSocket("@tcp://*:1011"),new XPublisherSocket("@tcp://*:1012"));
Console.WriteLine("running...");
proxy.Start();
}
}
}
PubSub.Client消費者、生産者
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using NetMQ;
using NetMQ.Sockets;
namespace PubSub.Client
{
class Program
{
private static PublisherSocket socket = new PublisherSocket(">tcp://localhost:1011");
private static NetMQQueue queue = new NetMQQueue();
static void Main(string[] args)
{
queue.ReceiveReady += (s, e) =>
{
var msg = e.Queue.Dequeue();
socket.SendMoreFrame("aaa").SendFrame(msg);
};
NetMQPoller poller = new NetMQPoller();
poller.Add(queue);
poller.RunAsync();
for (var i = 0; i < 100; i++)
{
Task.Factory.StartNew(Run, i);
}
Console.WriteLine("running...");
Console.Read();
}
private static void Run(object obj)
{
while (true)
{
queue.Enqueue(Convert.ToString(DateTime.Now.Ticks));
Thread.Sleep(1);
}
}
}
}
redisテストモデル
RedisPubSub.Server
using StackExchange.Redis;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace RedisPubSub.Server
{
class Program
{
private static ConnectionMultiplexer ConnectionMultiplexer = ConnectionMultiplexer.Connect("127.0.0.1:5379,127.0.0.1:5380,password=123456");
private static List d = new List();
static void Main(string[] args)
{
var date = DateTime.Now;
ConnectionMultiplexer.GetSubscriber().Subscribe("aaa", (c, m) =>
{
var t = DateTime.Now.Subtract(new DateTime(Convert.ToInt64(m)));
d.Add(t);
});
Task.Factory.StartNew(() =>
{
while (true)
{
Thread.Sleep(1 * 1000);
var tmp = d;
d = new List();
if (tmp.Any())
Console.WriteLine($"{DateTime.Now}\t{tmp.Count / 1}/s\t{tmp.Average(x => x.TotalMilliseconds)}");
else
Console.WriteLine("-");
}
});
Console.Read();
}
}
}
RedisPubSub.Client
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using StackExchange.Redis;
namespace RedisPubSub.Client
{
class Program
{
private static ConnectionMultiplexer ConnectionMultiplexer = ConnectionMultiplexer.Connect("127.0.0.1:5379,127.0.0.1:5380,password=123456");
static void Main(string[] args)
{
for (var i = 0; i < 100; i++)
{
Task.Factory.StartNew(Run,i);
}
Console.WriteLine("running...");
Console.Read();
}
private static void Run(object obj)
{
while (true)
{
ConnectionMultiplexer.GetSubscriber().Publish("aaa", DateTime.Now.Ticks);
Thread.Sleep(1);
}
}
}
}
Netmq運転結果
2018/9/5 21:38:07 4810/s 0.0773472247630134
2018/9/5 21:38:17 4953/s 0.0345690630488765
2018/9/5 21:38:27 4956/s 0.0315224333756985
2018/9/5 21:38:37 4838/s 0.0788584731751667
2018/9/5 21:38:47 4968/s 0.0267414818244033
2018/9/5 21:38:57 4927/s 0.0456701723053194
2018/9/5 21:39:07 4921/s 0.0432689302042063
2018/9/5 21:39:17 4927/s 0.0381800129878038
2018/9/5 21:39:27 4816/s 0.0886798555027297
2018/9/5 21:39:37 4666/s 0.118044937753114
2018/9/5 21:39:47 4938/s 0.0384998096665115
2018/9/5 21:39:57 4956/s 0.029773836282561
2018/9/5 21:40:07 4947/s 0.0333202635781134
2018/9/5 21:40:17 4846/s 0.0800275512730572
redisテスト結果
2018/9/5 21:41:18 4948/s 0.0181999898967447
2018/9/5 21:41:28 4987/s 0.0271869203472404
2018/9/5 21:41:38 4988/s 0.0162716218925421
2018/9/5 21:41:48 4995/s 0.0106067283246252
2018/9/5 21:41:58 4991/s 0.00916983653191234
2018/9/5 21:42:08 4983/s 0.0864149871578777
2018/9/5 21:42:18 4989/s 0.0139901451076282
2018/9/5 21:42:28 4993/s 0.0140108803620779
2018/9/5 21:42:38 4988/s 0.0111812829507868
2018/9/5 21:42:48 4994/s 0.00812532134705482
2018/9/5 21:42:58 4991/s 0.0198666673345088
2018/9/5 21:43:08 4987/s 0.025591579622687
2018/9/5 21:43:18 4988/s 0.0872425344264263
2018/9/5 21:43:28 4989/s 0.0124017437665357
2018/9/5 21:43:38 4992/s 0.0293738252613868
2018/9/5 21:43:48 4972/s 0.0869289167286693
結論はみんなでまとめて、しかも今は小量のデータのテスト以上に不適切なところがあるので、皆さんに指摘してもらいたいです.ありがとうございます.