メッセージキューactiveMQ+spring(テストdome)

10582 ワード

メッセージキューactiveMQ+spring(テストdome)
mavenを使用したプロジェクトの作成
    mvn archetype:generate

背景
    base-db--->kettle--->warehouse-db(X),warehouse-dbbase-db;

イニシアチブ
                 ,            ,         .            .

インプリメンテーション
環境
    jdk1.8+activemq(docker)+spring+maven

環境構築
    idea(   )  spring+maven  

pom.xml(非完全)
     
        5.1.5.RELEASE
    
    
        
            junit
            junit
            4.13-beta-3
            test
        
        
            org.springframework
            spring-beans
            ${sb.version}
        
        
            org.springframework
            spring-core
            ${sb.version}
        
        
            org.springframework
            spring-jms
            ${sb.version}
        
        
            org.springframework
            spring-test
            ${sb.version}
        
        
            org.springframework
            spring-context
            ${sb.version}
        
        
            org.apache.activemq
            activemq-core
            5.5.0
            
                
                    org.slf4j
                    slf4j-api
                
            
        
        
            org.apache.activemq
            activemq-pool
            5.7.0
        
        
        
            com.fasterxml.jackson.core
            jackson-databind
            2.9.8
        
        
        
            org.apache.xbean
            xbean-spring
            4.14
        
    

メッセージ・エンティティ・クラスの作成
メッセージはシーケンス化する必要があります
	package com.mq.core.now;

	import java.io.Serializable;

	public class Spittle implements Serializable{


    private String name;

    private String age;

    public Spittle(String name, String age) {
        this.name = name;
        this.age = age;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getAge() {
        return age;
    }

    public void setAge(String age) {
        this.age = age;
    }

    @Override
    public String toString() {
        return "name:"+name+",age:"+age;
    }
}


インタフェースの作成
簡単なメッセージの送信とメッセージの受信のみを提供し、本稿ではポイント・ツー・ポイント・メッセージ・キュー操作を実現し、トピック・サブスクリプション放送の読者が自分で試みる
   package com.mq.core.now;

public interface AlertService {
   /**
    *
    * @param spittle
    */
   void sendSpittleAlert(Spittle spittle,String spittleName);

   void sendSpittleAlert(Spittle spittle);

   public Spittle receiveSpittleAlert();

   public Spittle receiveSpittleAlert(String spittleName);

}


インタフェース実装クラス
   package com.mq.core.now;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsOperations;
import org.springframework.jms.support.JmsUtils;

import javax.jms.JMSException;
import javax.jms.ObjectMessage;
import javax.jms.Session;


public class AlertServiceImpl implements AlertService {

   /**
    *   jms  
    * @param jmsOperations
    */
   @Autowired
   private JmsOperations jmsOperations;



   /**
    *     
    * @param spittle
    */
   @Override
   public void sendSpittleAlert(final Spittle spittle) {
       /**
        *   ,    
        */
       jmsOperations.send((Session session) -> {
           return session.createObjectMessage(spittle);
       });
   }

   /**
    *     (        )
    * @param spittle
    */
   @Override
   public void sendSpittleAlert(final Spittle spittle,String spittleName) {
       /**
        *   ,    
        */
       jmsOperations.send(spittleName,(Session session) -> session.createObjectMessage(spittle));
   }

   /**
    *     (   ,         )
    * @return
    */
   @Override
   public Spittle receiveSpittleAlert() {
       try {
           ObjectMessage receiveMessage = (ObjectMessage) jmsOperations.receive();
           return (Spittle) receiveMessage.getObject();
       } catch (JMSException e) {
           //    
          throw JmsUtils.convertJmsAccessException(e);
       }
   }

   /**
   *            
   */
   @Override
   public Spittle receiveSpittleAlert(String spittleName) {
       try {
           ObjectMessage receiveMessage = (ObjectMessage) jmsOperations.receive(spittleName);
           return (Spittle) receiveMessage.getObject();
       } catch (JMSException e) {
           //    
           throw JmsUtils.convertJmsAccessException(e);
       }
   }
}


spring-config.xml



   
   
   
   

   
   
   
   
   
       
   
   
   
   

   


テスト
	package com.mq;

import com.mq.core.now.AlertService;
import com.mq.core.now.Spittle;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;


@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = {"classpath:spring-config.xml"})
public class TestActiveMQ {
   @Autowired
   private AlertService alertService;

   @Test
   public void test01(){
       for (int i = 0; i < 3 ; i++) {
           System.out.printf("----    ----");
           Spittle xuzz = new Spittle("xuzz", "23"+i);
           alertService.sendSpittleAlert(xuzz);
           System.out.println("      ");

       }
   }

   @Test
   public void testPOP(){
       System.out.printf("----      ----");
       Spittle spittle = alertService.receiveSpittleAlert();
       System.err.println(spittle.toString());
   }

   @Test
   public void testPOP1(){
       while(true){
           System.err.println(alertService.receiveSpittleAlert("yb-push").toString());
           new Thread(()-> {
               try {
                   Thread.sleep(1000L);
                   System.out.println("------------"+Thread.currentThread().getId());
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
           }).start();
       }

   }
   @Test
   public void testPOP2(){
               while(true){
                   System.err.println(alertService.receiveSpittleAlert("yb-push").toString());
                   new Thread(()-> {
                       try {
                           Thread.sleep(1000L);
                           System.out.println("------------"+Thread.currentThread().getId());
                       } catch (InterruptedException e) {
                           e.printStackTrace();
                       }
                   }).start();
               }
   }
}


情報を受信とき、消費者は、メッセージが来るまで動作を完了する必要がある場合がある、そうでなければ、メッセージの到来を待つ必要がある.リソースを占有し続ける場合、メッセージリスナーを用いてメッセージの非同期受信を実現することができる.
	package com.mq.core.now;


import org.springframework.beans.factory.annotation.Autowired;

/**
* spring MDP            
*/
public class SpittleAlertHandler {

   @Autowired
   private AlertService alertService;

   /**
    *           
    * @param spittle
    */
   public void handlerSpittleAlter(Spittle spittle){
         //...            
       //             ,
       alertService.sendSpittleAlert(spittle,"yb-push");
   }
}


文章の书き方は简単で、多くの地方は言っていません.后の文章あるいは本文章は引き続き完璧になります