activemq5.8使用手記
17726 ワード
Activemqで使用するプロファイル
起動方法:
cd/home/apache-activemq-5.8.0/bin/linux-x86-64/
./activemq start
./activemq stop
送信側関連コード
connectionはプログラムが起動して一度作成すればよいので,後でセッションを直接作成して具体的に操作する.
一度に1つのメッセージしか送信しない場合は、トランザクション機能をオンにしないほうがいい.性能に大きな影響を及ぼす.テストしたTPSは10倍の低下がある.
受信側:
同様に、プログラムの起動時にのみconnectionを作成する.複数のセッションを作成すると、複数のスレッドが受信する.
性能テストは、2000 TPSのメッセージ送信量に達することができ、性能と安定性は信頼できる.
サーバメッセージのリアルタイム統計は、:http://xxx.xxx.xxx.xxx:8161/admin/queues.jspで表示することができる.
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!-- START SNIPPET: example -->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<!-- Allows us to use system properties as variables in this configuration file -->
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<value>file:${activemq.conf}/credentials.properties</value>
</property>
</bean>
<!--
The <broker> element is used to configure the ActiveMQ broker.
-->
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}">
<!--
For better performances use VM cursor and small memory limit.
For more information, see:
http://activemq.apache.org/message-cursors.html
Also, if your producer is "hanging", it's probably due to producer flow control.
For more information, see:
http://activemq.apache.org/producer-flow-control.html
-->
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" producerFlowControl="false">
<!-- The constantPendingMessageLimitStrategy is used to prevent
slow topic consumers to block producers and affect other consumers
by limiting the number of messages that are retained
For more information, see:
http://activemq.apache.org/slow-consumer-handling.html
-->
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="1000"/>
</pendingMessageLimitStrategy>
</policyEntry>
<policyEntry queue=">" producerFlowControl="false" memoryLimit="1024mb">
<!-- Use VM cursor for better latency
For more information, see:
http://activemq.apache.org/message-cursors.html
<pendingQueuePolicy>
<vmQueueCursor/>
</pendingQueuePolicy>
-->
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<!--
The managementContext is used to configure how ActiveMQ is exposed in
JMX. By default, ActiveMQ uses the MBean server that is started by
the JVM. For more information, see:
http://activemq.apache.org/jmx.html
-->
<managementContext>
<managementContext createConnector="false"/>
</managementContext>
<!--
Configure message persistence for the broker. The default persistence
mechanism is the KahaDB store (identified by the kahaDB tag).
For more information, see:
http://activemq.apache.org/persistence.html
-->
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
<!--
The systemUsage controls the maximum amount of space the broker will
use before slowing down producers. For more information, see:
http://activemq.apache.org/producer-flow-control.html
If using ActiveMQ embedded - the following limits could safely be used:
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage limit="20 mb"/>
</memoryUsage>
<storeUsage>
<storeUsage limit="1 gb"/>
</storeUsage>
<tempUsage>
<tempUsage limit="100 mb"/>
</tempUsage>
</systemUsage>
</systemUsage>
-->
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage limit="512 mb"/>
</memoryUsage>
<storeUsage>
<storeUsage limit="10 gb"/>
</storeUsage>
<tempUsage>
<tempUsage limit="50 gb"/>
</tempUsage>
</systemUsage>
</systemUsage>
<!--
The transport connectors expose ActiveMQ over a given protocol to
clients and other brokers. For more information, see:
http://activemq.apache.org/configuring-transports.html
-->
<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireformat.maxFrameSize=104857600"/>
</transportConnectors>
<!-- destroy the spring context on shutdown to stop jetty -->
<shutdownHooks>
<bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
</shutdownHooks>
</broker>
<!--
Enable web consoles, REST and Ajax APIs and demos
Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
-->
<import resource="jetty.xml"/>
</beans>
<!-- END SNIPPET: example -->
起動方法:
cd/home/apache-activemq-5.8.0/bin/linux-x86-64/
./activemq start
./activemq stop
送信側関連コード
connectionはプログラムが起動して一度作成すればよいので,後でセッションを直接作成して具体的に操作する.
一度に1つのメッセージしか送信しない場合は、トランザクション機能をオンにしないほうがいい.性能に大きな影響を及ぼす.テストしたTPSは10倍の低下がある.
public class TestServlet extends HttpServlet{
/**
*
*/
private static final long serialVersionUID = -8614928184491554864L;
public static Logger logger = Logger.getLogger(TestServlet.class);
ConnectionFactory connectionFactory;
Connection connection = null;
@Override
public void init() throws ServletException {
System.out.println("init");
connectionFactory = new ActiveMQConnectionFactory(
"system",
"manager","failover:(tcp://192.168.8.44:61616)?maxReconnectAttempts=-1");
// connectionFactory = new ActiveMQConnectionFactory(
// "system",
// "manager","failover:(tcp://127.0.0.1:61616)?maxReconnectAttempts=-1");
try {
connection = connectionFactory.createConnection();
connection.start();
} catch (Exception e) {
e.printStackTrace();
} finally {
}
}
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp)
throws ServletException, IOException {
super.doGet(req, resp);
}
@Override
protected void doPost(HttpServletRequest req, HttpServletResponse resp)
throws ServletException, IOException {
resp.setContentType("text/html");
BufferedReader bufread = new BufferedReader(new InputStreamReader(req.getInputStream(), "UTF-8"));
String readLine = bufread.readLine();
StringBuffer buftemp = new StringBuffer();
while (readLine != null) {
buftemp.append(readLine);
readLine = bufread.readLine();
}
TextMessage message;
try {
Session session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("test.queue");
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
message = session.createTextMessage(buftemp.toString());
producer.send(message);
session.close();
resp.getWriter().println("0");
resp.getWriter().close();
} catch (JMSException e) {
e.printStackTrace();
}
}
@Override
public void destroy() {
try {
if (null != connection)
connection.close();
} catch (Throwable ignore) {
}
}
}
受信側:
同様に、プログラムの起動時にのみconnectionを作成する.複数のセッションを作成すると、複数のスレッドが受信する.
public class JMSReceive {
private static Logger logger = Logger.getLogger(JMSReceive.class.getName());
public static void main(String[] args) throws FileNotFoundException {
JMSReceive mr = new JMSReceive();
mr.receive();
}
public void receive()
{
try{
logger.info("jms connection to "+Constant.JMS_URL+" "+Constant.JMS_UIN+" "+Constant.JMS_PWD+"!");
ActiveMQConnectionFactory amqc = new ActiveMQConnectionFactory(Constant.JMS_UIN,Constant.JMS_PWD,Constant.JMS_URL);
Connection con = amqc.createConnection();
con.start();
logger.info("jms connectioned! "+Constant.JMS_QUEUE_NAME);
for (int i = 0; i < Constant.JMS_CONSUMER_COUNT; i++) {
Session se = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination des = se.createQueue(Constant.JMS_QUEUE_NAME);
MessageConsumer mc = se.createConsumer(des);
logger.info("jms consumer :"+i);
mc.setMessageListener(new MessageListener() {
DBService dbService = new DBService();
public void onMessage(Message message) {
try {
TextMessage tm = (TextMessage) message;
logger.info("Thread:"+Thread.currentThread().getId()+"rev:"+tm.getText());
String transactionId = "";
String queueType ="";
String comeFrom ="";
String userNumber ="";
String action ="";
String actionTime="";
String xml = "";
String statedate="";
String passId = "";
Document document = DocumentHelper.parseText(tm.getText());
Node TransactionIDNode = document.selectSingleNode("//OBJECT/HEADER/TransactionID");
Node QueueTypeNode = document.selectSingleNode("//OBJECT/HEADER/QueueType");
Node ComeFromNode = document.selectSingleNode("//OBJECT/HEADER/ComeFrom");
Node TimeStampNode = document.selectSingleNode("//OBJECT/HEADER/TimeStamp");
Node UserNumberNode = document.selectSingleNode("//OBJECT/BODY/UserNumber");
Node ActionNode = document.selectSingleNode("//OBJECT/BODY/Action");
Node ActionTimeNode = document.selectSingleNode("//OBJECT/BODY/ActionTime");
transactionId = getDOM4JNodeText(TransactionIDNode);
queueType = getDOM4JNodeText(QueueTypeNode);
comeFrom = getDOM4JNodeText(ComeFromNode);
userNumber = getDOM4JNodeText(UserNumberNode);
action = getDOM4JNodeText(ActionNode);
actionTime = getDOM4JNodeText(ActionTimeNode);
//
if(queueType.equals("80"))
{
action = getDOM4JNodeText(document.selectSingleNode("//OBJECT/BODY/LoginType"));
actionTime = getDOM4JNodeText(document.selectSingleNode("//OBJECT/BODY/OnlineTime"));
}
if(actionTime.isEmpty())
{
actionTime = Tools.getCurDateTime();
}
// xml ,
if(document.asXML().length()>1900)
{
xml = document.asXML().substring(0, 1900);
}
else
{
xml = document.asXML();
}
statedate = Tools.getCurrentDateTime();
//
long start = System.currentTimeMillis();
int ret = dbService.exeProcAddIntUserDayDetail(statedate, userNumber, transactionId, queueType,
comeFrom, action, actionTime, xml, passId);
long time = System.currentTimeMillis()-start;
if(ret==0)
{
logger.info("userNumber:"+userNumber+" transactionId:"+transactionId+" action:"+action +" ret="+ret+" time="+time);
}
else
{
FileUtil.writeErrorDat(xml);
logger.error("userNumber:"+userNumber+" transactionId:"+transactionId+" action:"+action +" ret="+ret+" errorxml:"+xml+" time="+time);
}
} catch (Exception e) {
logger.error("receive and parser message error!", e);
}
}
private String getDOM4JNodeText(Node node)
{
if(node==null)
return "";
if(node.getText()==null)
return "";
return node.getText();
}
});
}
}catch(Exception e)
{
logger.info("receive msg error!",e);
}
}
}
性能テストは、2000 TPSのメッセージ送信量に達することができ、性能と安定性は信頼できる.
サーバメッセージのリアルタイム統計は、:http://xxx.xxx.xxx.xxx:8161/admin/queues.jspで表示することができる.