pythonはstompプロトコルとhornetqで接続します

13093 ワード

===================================== 2014-03-28 =========================================
stomp.pyの公式サイトアドレス:https://pypi.python.org/pypi/stomp.py
 
例アドレス: https://github.com/jasonrbriggs/stomp.py/wiki/Simple-Example
stomp.py 4.0.11添付ファイルを参照
例コード:
# -*-coding:utf-8-*-
'''
Created on 2014-3-28

@author: xuepeng
'''

from logging.handlers import TimedRotatingFileHandler
import logging
import os
import stomp
import sys
import time


def initLogger(logFileName):
        if not os.path.isfile(logFileName):
            logPath = logFileName[0:logFileName.rfind('/') + 1]
            if not os.path.isdir(logPath):
                os.makedirs(logPath)
            f = open(logFileName, 'w')
            f.close()
        format = '%(asctime)s [%(threadName)s](%(levelname)s) %(pathname)s(%(funcName)s:%(lineno)s) --> %(message)s'
        filemode = 'a'
        # level = logging.DEBUG
        level = logging.INFO
        logging.basicConfig(filemode=filemode, level=level, format=format)
        hdlr0 = TimedRotatingFileHandler(logFileName, when='D', interval=1, backupCount=0, encoding='utf-8', delay=False, utc=False)
        formatter = logging.Formatter(format)
        hdlr0.setFormatter(formatter)
        logger = logging.getLogger()
        logger.addHandler(hdlr0)
        return logger


logger = initLogger('e:/logs/simple.log')

class MyListener(object):
    def on_error(self, headers, message):
        print('received an error %s' % message)
    def on_message(self, headers, message):
        print('----->received a message %s' % message)

def main():         
    dest = 'jms.queue.com.wanmei.mq.test.xuepeng'
    
    conn = stomp.Connection(host_and_ports=[ ('mq-node1', 61613) ])
    conn.set_listener('', MyListener())
    conn.start()
    conn.connect()
    conn.subscribe(destination=dest, id=1, ack='auto')
    
    msg = ' '.join(sys.argv[1:])
    num = 0
    while num < 1000:
        conn.send(body=msg, destination=dest)
        print 'send message ', msg
        num = num + 1 
    
    time.sleep(2)
    conn.disconnect()

if __name__ == '__main__':
    main()

このコードを実行します.
python E:\workspace_python\Demo\src\com\wanmei\stomp\simple.py hello world 123
==============================================================================
またHornetQを見てみると、自分でpythonを学んだのでJavaでHornetQに接続するだけでなく、pythonで接続して開発したいと思います.HornetQのマニュアルを見ると、HornetQはstompメッセージの持続化をサポートしていないことがよくわかります.これは大きな欠点だ.しかし、やはりコンピュータ言語にまたがる機能をサポートしています.試して、簡単なテストを書きました.運転を開始すると、データが失われていることがわかります.この問題はJBossで言及されていますが、具体的には研究を続けています.HornetQのstompサポートは、マニュアルを参照してください.長い間テストしていたが、最初の短い時間でデータが失われ、中間運転が安定していることが分かった.データ損失に敏感なアプリケーションでない場合は、テストを行うことができます.深く研究する必要がある.
次に例のコードを示します.
 
#-*-coding:utf-8-*-
'''
Created on 2012-2-20

'''
import logging
import stomp
import time


 
logging.basicConfig()
dest = 'jms.queue.TestQueue' 
#dest = 'jms.topic.TestTopic' 

logging.basicConfig()
class MyListener(stomp.ConnectionListener):     
    def on_error(self, headers, message):         
        print('received an error %s' % message)     
    def on_message(self, headers, message):
        print '--------------------------------------'
        #for k, v in headers.iteritems():             
        #    print('header: key %s , value %s' % (k, v))         
        print('received message
%s' % message) def on_disconnected(self): """ Called by the STOMP connection when a TCP/IP connection to the STOMP server has been lost. No messages should be sent via the connection until it has been reestablished. """ pass def on_connecting(self, host_and_port): """ Called by the STOMP connection once a TCP/IP connection to the STOMP server has been established or re-established. Note that at this point, no connection has been established on the STOMP protocol level. For this, you need to invoke the "connect" method on the connection. \param host_and_port a tuple containing the host name and port number to which the connection has been established. """ pass def on_connected(self, headers, body): """ Called by the STOMP connection when a CONNECTED frame is received, that is after a connection has been established or re-established. \param headers a dictionary containing all headers sent by the server as key/value pairs. \param body the frame's payload. This is usually empty for CONNECTED frames. """ pass def on_heartbeat_timeout(self): """ Called by the STOMP connection when a heartbeat message has not been received beyond the specified period. """ pass def on_receipt(self, headers, body): """ Called by the STOMP connection when a RECEIPT frame is received, sent by the server if requested by the client using the 'receipt' header. \param headers a dictionary containing all headers sent by the server as key/value pairs. \param body the frame's payload. This is usually empty for RECEIPT frames. """ pass def on_send(self, headers, body): """ Called by the STOMP connection when it is in the process of sending a message \param headers a dictionary containing the headers that will be sent with this message \param body the message payload """ pass try: conn = stomp.Connection([('192.168.123.74', 61613)]) conn.set_listener('somename', MyListener()) print('set up Connection') conn.start() print('started connection') conn.connect(wait=True) print('connected') while True: num = 0 count = 99999 while num < count: try: num += 1 message = 'hello world ' + str(num) conn.send(message=message, destination=dest, headers={'type':'textMessage'}, ack='auto') #print 'sent message:', message except Exception , e: print '==============', e print 'It has produce ' + str(count) + ' messages' time.sleep(2) except Exception , e: print '----------------- ', e print('slept') conn.disconnect() print('disconnected')

 
 
#-*-coding:utf-8-*-
'''
Created on 2012-2-20

'''
import logging
import stomp
import time

logging.basicConfig()
class MyListener(stomp.ConnectionListener):
    def __init__(self,conn,headers): 
        super(MyListener,self).__init__()
        self.conn = conn
        self.headers = headers
    def on_error(self, headers, message):         
        print('received an error %s' % message)     
    def on_message(self, headers, message):
        print '--------------------------------------'
        #for k, v in headers.iteritems():             
        #    print('header: key %s , value %s' % (k, v))         
        print('received message
%s' % message) def on_disconnected(self): """ Called by the STOMP connection when a TCP/IP connection to the STOMP server has been lost. No messages should be sent via the connection until it has been reestablished. """ if not self.conn.is_connected(): print 'Error: conn failure! try to connection again' sleepTime = 5 print '+++++++++++++++++++++++++++++++++++++++++++' print 'it will sleep ' + str(sleepTime) + ' seconds.' time.sleep(sleepTime) consume() pass def on_connecting(self, host_and_port): """ Called by the STOMP connection once a TCP/IP connection to the STOMP server has been established or re-established. Note that at this point, no connection has been established on the STOMP protocol level. For this, you need to invoke the "connect" method on the connection. \param host_and_port a tuple containing the host name and port number to which the connection has been established. """ pass def on_connected(self, headers, body): """ Called by the STOMP connection when a CONNECTED frame is received, that is after a connection has been established or re-established. \param headers a dictionary containing all headers sent by the server as key/value pairs. \param body the frame's payload. This is usually empty for CONNECTED frames. """ pass def on_heartbeat_timeout(self): """ Called by the STOMP connection when a heartbeat message has not been received beyond the specified period. """ pass def on_receipt(self, headers, body): """ Called by the STOMP connection when a RECEIPT frame is received, sent by the server if requested by the client using the 'receipt' header. \param headers a dictionary containing all headers sent by the server as key/value pairs. \param body the frame's payload. This is usually empty for RECEIPT frames. """ pass def on_send(self, headers, body): """ Called by the STOMP connection when it is in the process of sending a message \param headers a dictionary containing the headers that will be sent with this message \param body the message payload """ pass def consume(): dest = 'jms.queue.TestQueue' clientId = 919191 headers={'client-id':clientId} #dest = 'jms.topic.TestTopic' conn = stomp.Connection([('192.168.123.74', 61613)]) print('set up Connection') conn.set_listener('somename', MyListener(conn,headers)) print('Set up listener') conn.start() print('started connection') conn.connect(wait=True,headers=headers) print('connected') conn.subscribe(destination=dest, ack='auto') print('subscribed') while True: pass print('slept') conn.disconnect() print('disconnected') if __name__ == '__main__': consume()