ActiveMQ的消息接收的问题(使用MessageListener)
在用ActiveMQ的MessageListener消息接收模式的时候,
我setMessageListener了本类,也就是consumer.setMessageListener(this);
然后,又在onMessage里面做了相应消息的处理。
然后提交了session。
我的问题是,
只关闭这个session,
这个class会结束并进入回收环节吗?
因为我这个类是一个TimerTask,
每隔一段时间启动一次来接收消息,
我想知道的是,如果第一次收到消息,调用了onMessage方法以后,
这个任务会不会就彻底结束了?
还是
不只要在onMessage方法里关闭session对象,
还要在里面关闭Connection对象,
这个任务才算完全的执行完。
如果不关,这个链接会不会一直存在,直到整个程序停止,或者ActiveMQ停止了,
才能结束?
这样的话,我接受的内容如果一多,那还不把内存撑暴啊?
PS:
打字的时候突然想到,
这个conn如果设置成static的,
取得conn也在static块里面,
是不是再执行这个任务的时候,
就不用每次都建立链接,
而直接从这个链接里取得session,然后再取consumer就可以了呢?
这个相当于长链接的conn会不会成为系统里的结石??
贴些代码
就是红色的XXXX这里要不要关掉那个conn对象。
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
*
* @author
*
* 1. 数据接收线程监听ActiveMQ队列,并读取消息。。
*
*/
public class ReceiveTask extends TimerTask implements MessageListener {
private static final Log log = LogFactory.getLog(ReceiveTask.class);
private Session session;
private javax.jms.Connection conn = null ;
private MessageConsumer consumer = null;
/**
*
*/
public void run() {
//开始监听设置
try {
log.info("数据消费者开始启动...");
conn = MQConnection.getJmsConnection();
conn.start();
// 得到session
session = conn.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 通过本地节点的接收队列名,监听需要监听的队列名
Destination destination = session.createQueue("QueueName");
MessageProducer replyProducer = session.createProducer(null);
// 设置传送模式为“非持久性消息”
replyProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// 初始化消息的消费者
consumer = session.createConsumer(destination);
// 设置监听器,有消息来得时候回调MessageListener.onMessage方法
consumer.setMessageListener(this);
log.info("数据消费者启动完成...");
} catch (Exception e) {
log.info("数据消费者启动异常...");
}
}
/**
* MessageListener.onMessage
*/
public void onMessage(Message message) {
try {
// 各种处理,略
//提交session
session.commit();
}
} catch (Exception e) {
MQConnection.rollback(session);
MQUtil.log(e, log);
} finally {
// XXXX
}
}
} ActiveMQ
[解决办法]
一般都是在客户端关闭seesion conection,代码例子
@Override
public void convertAndSend(Event message) {
// build ConnectionFactory And Queue is necessary
buildConnectionFactoryAndQueue();
Connection connect = null;
Session session = null;
MessageProducer producer = null;
try {
connect = jmsConnectionFactory.createConnection();
session = connect.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
producer = session.createProducer(queue);
// create a JMS message and send it
ObjectMessage objMsg = session.createObjectMessage(message);
// set message selector
String messageSelector = message.getMessageSelector();
objMsg.setStringProperty("messageReceiver", messageSelector);
producer.send(objMsg);
} catch (JMSException e) {
String errorMessage = "JMSException while queueing HTTP JMS Message";
throw new EventRuntimeException(errorMessage, e);
} finally {
SafeCloseUtil.close(producer); // 这里关闭 producer
SafeCloseUtil.close(session); // 这里关闭 producer
SafeCloseUtil.close(connect);
}
}
@Override
public void onMessage(Message message) {
printLogMessage("start public function onMessage()..");
try {
if (message instanceof ObjectMessage) {
ObjectMessage objMsg = (ObjectMessage) message;
Event event = (Event) objMsg.getObject();
// if Re deliver message warn the message
if (message.getJMSRedelivered()) {
log.warn("...", event.getClass().getSimpleName());
}
// out Put Event Log
outPutEventLog(event);
// dispatch Event
dispatchEvent(event);
} else {
log.error("This MDB message was not instance of ObjectMessage; ignoring.");
}
printLogMessage("end public function onMessage()..");
} catch (JMSException e) {
String errorMessage = "JMS Exception while Listener message.errorMessage:"
+ e.getMessage();
log.error(errorMessage);
throw new EventRuntimeException(errorMessage, e);
}
}
<bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
destroy-method="stop">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL"
value="failover:(tcp://10.60.60.107:61616?wireFormat.maxInactivityDuration=0)&maxReconnectDelay=1000" />
</bean>
</property>
<!-- 连接数 -->
<property name="maxConnections" value="1" />
<!-- Session总数 -->
<property name="maximumActive" value="500" />
</bean>
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory" />
<!--
<property name="defaultDestination" ref="oamTmpTopic" />
-->
<property name="explicitQosEnabled" value="true" />
<!-- 1为非持久化,2为持久化 -->
<property name="deliveryMode" value="1" />
</bean>