1、創(chuàng)建普通的maven項(xiàng)目11-activemq-java
2、在pom.xml文件中加入jms 和 activemq 的相關(guān)依賴
<!-- JMS規(guī)范的jar依賴 -->
<dependency>
<groupId>javax.jms</groupId>
<artifactId>javax.jms-api</artifactId>
<version>2.0.1</version>
</dependency>
<!-- activeMQ對(duì)jms具體實(shí)現(xiàn)的jar依賴 -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
<version>5.15.8</version>
</dependency>
3、在com.bjpowernode.activemq.send包下編寫一個(gè)消費(fèi)發(fā)送者QueueSender發(fā)送消息
package com.bjpowernode.activemq.send;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class QueueSender {
public static final String BROKER_URL = "tcp://192.168.235.128:61616";
//相當(dāng)于一個(gè)數(shù)據(jù)庫(kù)(其實(shí)是一個(gè)隊(duì)列)
public static final String DESTINATION = "myQueue";
public static void main(String[] args) {
sendMessage();
}
public static void sendMessage(){
//1 .創(chuàng)建一個(gè)連接工廠
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
Connection connection = null;
Session session = null;
MessageProducer messageProducer = null;
try {
//2. 獲取一個(gè)連接
connection = connectionFactory.createConnection();
//3. 創(chuàng)建一個(gè)Session 第一個(gè)參數(shù):是否是事務(wù)消息 第二個(gè)參數(shù):消息確認(rèn)機(jī)制(自動(dòng)確認(rèn)還是手動(dòng)確認(rèn))
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
//4. 有了session之后,就可以創(chuàng)建消息,目的地,生產(chǎn)者和消費(fèi)者
Message message = session.createTextMessage("Hello ActiveMQ");
//目的地
Destination destination = session.createQueue(DESTINATION);
//生產(chǎn)者
messageProducer = session.createProducer(destination);
//發(fā)消息 沒有返回值,是非阻塞的
messageProducer.send(message);
} catch (JMSException e) {
e.printStackTrace();
}finally{
try {
if(messageProducer != null){
messageProducer.close();
}
if(session != null){
session.close();
}
if(connection != null){
connection.close();
}
}catch (JMSException e) {
e.printStackTrace();
}
}
}
}
4、運(yùn)行程序,連接ActiveMQ的web控制臺(tái)查看
5、啟動(dòng)提示sl4j日志沒有實(shí)現(xiàn),在pom.xml文件中添加slf4j-simple 1.7.25,或者slf4j-nop 1.7.25
<!--slf4j的簡(jiǎn)單實(shí)現(xiàn)-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.25</version>
</dependency>
6、在com.bjpowernode.activemq.receive包下編寫一個(gè)消費(fèi)接收QueueReceiver接收消息
package com.bjpowernode.activemq.receive;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class QueueReceiver {
public static final String BROKER_URL = "tcp://192.168.235.128:61616";
//相當(dāng)于一個(gè)數(shù)據(jù)庫(kù)(其實(shí)是一個(gè)隊(duì)列)
public static final String DESTINATION = "myQueue";
public static void main(String[] args) {
receiveMessage();
}
public static void receiveMessage(){
//1 .創(chuàng)建一個(gè)連接工廠
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
Connection connection = null;
Session session = null;
MessageConsumer messageConsumer = null;
try {
//2. 獲取一個(gè)連接
connection = connectionFactory.createConnection();
//接收消息,需要將連接啟動(dòng)一下,才可以接收到消息
connection.start();
//3. 創(chuàng)建一個(gè)Session 第一個(gè)參數(shù):是否是事務(wù)消息 第二個(gè)參數(shù):消息確認(rèn)機(jī)制(自動(dòng)確認(rèn)還是手動(dòng)確認(rèn))
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
//4. 有了session之后,就可以創(chuàng)建消息,目的地,生產(chǎn)者和消費(fèi)者
//目的地
Destination destination = session.createQueue(DESTINATION);
//消費(fèi)者
messageConsumer = session.createConsumer(destination);
//循環(huán)接收消息
while (true){
//接收消息 有返回值,是阻塞的
Message message = messageConsumer.receive();
//判斷消息類型
if(message instanceof TextMessage){
String text = ((TextMessage) message).getText();
System.out.println(text);
}
}
} catch (JMSException e) {
e.printStackTrace();
}finally{
try {
if(messageConsumer != null){
messageConsumer.close();
}
if(session != null){
session.close();
}
if(connection != null){
connection.close();
}
}catch (JMSException e) {
e.printStackTrace();
}
}
}
}
注意:接收方要調(diào)用connection的start方法才能接收到
7、運(yùn)行接收者的代碼,在ActiveMQ的web控制臺(tái)觀察消息數(shù)據(jù)