這篇文章給大家分享的是有關spring整合JMS如何實現同步收發消息的內容。小編覺得挺實用的,因此分享給大家做個參考,一起跟隨小編過來看看吧。
1. 安裝ActiveMQ
注意:JDK版本需要1.7及以上才行
到Apache官方網站下載最新的ActiveMQ的安裝包,并解壓到本地目錄下,下載鏈接如下:http://activemq.apache.org/download.html,解壓后的目錄結構如下:
bin目錄結構如下:


如果我們是32位的機器,就雙擊win32目錄下的activemq.bat,如果是64位機器,則雙擊win64目錄下的activemq.bat,運行結果如下:

啟動成功!成功之后在瀏覽器輸入http://127.0.0.1:8161/地址,可以看到ActiveMQ的管理頁面,用戶名和密碼默認都是admin,如下:

2. 新建一個Maven工程,并配置pom文件如下:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.chhliu.myself</groupId>
<artifactId>activemq_start</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>activemq_start</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spring-version>3.2.5.RELEASE</spring-version>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring-version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>${spring-version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>${spring-version}</version>
</dependency>
<dependency>
<groupId>javax.annotation</groupId>
<artifactId>jsr250-api</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.13.3</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.0</version>
</dependency>
</dependencies>
</project>3. 配置連接工廠(ConnectionFactory)
Spring給我們提供了如下的連接工廠:

其中SingleConnectionFactory保證每次返回的都是同一個連接,CachingConnectionFactory繼承了SingleConnectionFactory,在保證同一連接的同時,增加了緩存的功能,可以緩存Session以及生產者,消費者。當然,JMS提供的連接工廠只是用來實現管理的,并不是真正連接MQ的,真正的連接工廠需要具體的MQ廠商提供,下面我們以ActiveMQ為例來說明,配置如下:
<!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 --> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616" /> </bean> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <property name="targetConnectionFactory" ref="targetConnectionFactory" /> </bean>
為了減少我們連接的資源消耗,ActiveMQ為我們提供了一個連接工廠管理池--PooledConnectionFactory,通過連接工廠池,可以將Connection,Session等都放在池里面,用的時候直接返回池里面的內容,無需臨時建立連接,節約開銷。配置如下:
<!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 --> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616" /> </bean> <!-- 通過往PooledConnectionFactory注入一個ActiveMQConnectionFactory可以用來將Connection,Session和MessageProducer池化這樣可以大大減少我們的資源消耗, --> <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"> <property name="connectionFactory" ref="targetConnectionFactory" /> <property name="maxConnections" value="10" /> </bean> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <property name="targetConnectionFactory" ref="pooledConnectionFactory" /> </bean>
4. 配置JmsTemplate
配置好連接工廠之后,就需要配置JMS的JmsTemplate,JmsTemplate的作用和JdbcTemplate類似,我們發送和接收消息,都是通過JmsTemplate來實現的,配置如下:
<!-- 配置生產者:配置好ConnectionFactory之后我們就需要配置生產者。生產者負責產生消息并發送到JMS服務器,這通常對應的是我們的一個業務邏輯服務實現類。 但是我們的服務實現類是怎么進行消息的發送的呢?這通常是利用Spring為我們提供的JmsTemplate類來實現的, 所以配置生產者其實最核心的就是配置進行消息發送的JmsTemplate。對于消息發送者而言,它在發送消息的時候要知道自己該往哪里發, 為此,我們在定義JmsTemplate的時候需要往里面注入一個Spring提供的ConnectionFactory對象 --> <!-- Spring提供的JMS工具類,它可以進行消息發送、接收等 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory對象 --> <property name="connectionFactory" ref="connectionFactory" /> </bean>
5. 生產者實現
配置完這些之后,我們就可以寫代碼實現生產者和消費者了,生產者主要用來生產消息,并向目的隊列中推送消息,接口定義如下:
public interface ProducerService {
void sendMessage(Destination destination, final String message);
}實現類代碼如下:
@Service("producerServiceImpl")
public class ProducerServiceImpl implements ProducerService {
/**
* 注入JmsTemplate
*/
@Resource(name="jmsTemplate")
private JmsTemplate jTemplate;
/**
* attention:
* Details:發送消息
* @author chhliu
* 創建時間:2016-7-28 下午2:33:14
* @param destination
* @param message
*/
@Override
public void sendMessage(Destination receivedestination, final String message) {
System.out.println("================生產者創建了一條消息==============");
jTemplate.send(receivedestination, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage("hello acticeMQ:"+message);
}
});
}
}6. 消費者實現
假設生產者已經創建了一條消息,并推送到了對應的隊列中,消費者需要從這個隊列中取出消息,并同時回復一條報文,自己已經收到了這條消息,為了測試回復報文的功能,我們下面會將回復報文放到另一個隊列中,此例使用同步接收消息的方式,而不是異步監聽的方式實現,接口定義如下:
public interface ConsumerService {
String receiveMessage(Destination destination, Destination replyDestination);
}實現類代碼如下:
@Service("consumerServiceImpl")
public class ConsumerServiceImpl implements ConsumerService {
/**
* 注入JmsTemplate
*/
@Resource(name="jmsTemplate")
private JmsTemplate jTemplate;
/**
* attention:
* Details:接收消息,同時回復消息
* @author chhliu
* 創建時間:2016-7-28 下午2:39:45
* @param destination
* @return
*/
@Override
public String receiveMessage(Destination destination, Destination replyDestination) {
/**
* 接收消息隊列中的消息
*/
Message message = jTemplate.receive(destination);
try {
/**
* 此處為了更好的容錯性,可以使用instanceof來判斷下消息類型
*/
if(message instanceof TextMessage){
String receiveMessage = ((TextMessage) message).getText();
System.out.println("收到生產者的消息:"+receiveMessage);
/**
* 收到消息之后,將回復報文放到回復隊列里面去
*/
jTemplate.send(replyDestination, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage("消費者已經收到生產者的消息了,這是一條確認報文!");
}
});
return receiveMessage;
}
} catch (JMSException e) {
e.printStackTrace();
}
return "";
}
}生產者和消費者實現之后,我們要做的就是配置隊列了,下面給出項目完整的配置文件:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:p="http://www.springframework.org/schema/p" xmlns:cache="http://www.springframework.org/schema/cache" xmlns:jpa="http://www.springframework.org/schema/data/jpa" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.2.xsd http://www.springframework.org/schema/cache http://www.springframework.org/schema/cache/spring-cache-3.2.xsd http://www.springframework.org/schema/data/jpa http://www.springframework.org/schema/data/jpa/spring-jpa-1.3.xsd"> <!-- 掃描注解包 --> <context:annotation-config /> <context:component-scan base-package="com.chhliu.myself.activemq.start"></context:component-scan> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616" /> </bean> <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"> <property name="connectionFactory" ref="targetConnectionFactory" /> <property name="maxConnections" value="10" /> </bean> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <property name="targetConnectionFactory" ref="pooledConnectionFactory" /> </bean> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory" /> </bean> <!-- 在真正利用JmsTemplate進行消息發送的時候,我們需要知道消息發送的目的地,即destination。 在Jms中有一個用來表示目的地的Destination接口,它里面沒有任何方法定義,只是用來做一個標識而已。當我們在使用JmsTemplate進行消息發送時沒有指定destination的時候將使用默認的Destination。 默認Destination可以通過在定義jmsTemplate bean對象時通過屬性defaultDestination或defaultDestinationName來進行注入, defaultDestinationName對應的就是一個普通字符串 --> <!--這個是隊列目的地,點對點的 --> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg> <value>NTF_MOCK_INPUT</value> </constructor-arg> </bean> <!--這個是回復隊列,點對點的 --> <bean id="responseQueue" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg> <value>NTF_MOCK_OUTPUT</value> </constructor-arg> </bean> </beans>
到這里,所有的代碼和配置文件就都整好了,下面就是進行測試,測試代碼如下:
生產者測試代碼:
package com.chhliu.myself.activemq.start.sync;
import javax.annotation.Resource;
import javax.jms.Destination;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = { "classpath:applicationContext.xml" })
public class SyncProducerActiveMQTest {
@Resource(name="producerServiceImpl")
private ProducerService pService;
@Resource(name="queueDestination")
private Destination receiveQueue;
@Test
public void producerTest(){
pService.sendMessage(receiveQueue, "my name is chhliu!");
}
}消費者測試代碼:
package com.chhliu.myself.activemq.start.sync;
import javax.annotation.Resource;
import javax.jms.Destination;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = { "classpath:applicationContext.xml" })
public class SyncConsumerActiveMQTest {
@Resource(name="consumerServiceImpl")
private ConsumerService cService;
@Resource(name="queueDestination")
private Destination receiveQueue;
@Resource(name="responseQueue")
private Destination replyQueue;
@Test
public void producerTest(){
String result = cService.receiveMessage(receiveQueue, replyQueue);
System.out.println(result);
}
}測試結果如下:
生產者測試結果: ================生產者創建了一條消息============== 消費者測試結果: 收到生產者的消息:hello acticeMQ:my name is chhliu! hello acticeMQ:my name is chhliu!
再來看下ActiveMQ的管理頁面的結果:

從管理頁面中可以看到,生產者生產了消息,并且入隊列了,同時消費者也消費了消息,并將回復消息放到了回復隊列中,測試成功。
但是這種同步取消息的方式有個缺點,每次只會取一條消息消費,取完之后就會一直阻塞,下面來測試一下:首先讓生產者再生產5條消息,然后運行消費者程序,發現會只消費一條消息,除非我們在消費者程序里面加while(true),一直輪詢隊列,這種實現方式不僅耗內存,效率也不是很高,后面,我們會對這種方式進行改進,使用異步監聽模式,測試效果如下:
生產者創建了5條消息:
=======生產者創建了一條消息========
=======生產者創建了一條消息========
=======生產者創建了一條消息========
=======生產者創建了一條消息========
======生產者創建了一條消息=========
ActiveMQ管理頁面如下:

消費者消費一條消息:
收到生產者的消息:hello acticeMQ:my name is chhliu!
hello acticeMQ:my name is chhliu!
消費者消費消息后,ActiveMQ管理頁面如下:

從上面的對比中,我們可以看出來,同步模式下,消費者消費消息時,是逐條消費,每次只消費一條消息。
感謝各位的閱讀!關于“spring整合JMS如何實現同步收發消息”這篇文章就分享到這里了,希望以上內容可以對大家有一定的幫助,讓大家可以學到更多知識,如果覺得文章不錯,可以把它分享出去讓更多的人看到吧!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。