动态设置Spring DefaultMessageListenerContainer 的messageSelector
Spring JMS可以帮助开发人员快速的使用MQ的发送与接收,在异步接收方面,Spring 提供了MessageListenerContainer的容器接收消息。通过研究源码发现DefaultMessageListenerContainer是支持动态改变messageSelector的。在DefaultMessageListenerContainer 中有个cacheLevel的属性默认是4,把它改动到2或1或0,数字分表代表
public static final int CACHE_NONE = 0;
public static final int CACHE_CONNECTION = 1;
public static final int CACHE_SESSION = 2;
public static final int CACHE_CONSUMER = 3;
public static final int CACHE_AUTO = 4;
在设置完cacheLevel后就可以动态设置messageSelector,Container就能用上最新的selector了。
Spring配置如下
<bean id="messageListenerContainer" ref="jmsConnectionFactory" /> <property name="destination" ref="receiverQueue" /> <property name="messageListener" ref="jmsReceiver" /> <property name="concurrentConsumers" value="10" /> <property name="messageSelector" value="CLIENT='DEMO'" /> <property name="cacheLevel" value="2"/> </bean>
?修改messageSelector代码如下
DefaultMessageListenerContainer messageListenerContainer = (DefaultMessageListenerContainer) ac.getBean("messageListenerContainer");messageListenerContainer.setMessageSelector("CLIENT='DEMO2'");
?
源码分析:
//DefaultMessageListenerContainer类中, 每次收消息都会call的方法private boolean invokeListener() throws JMSException {initResourcesIfNecessary();boolean messageReceived = DefaultMessageListenerContainer.this.receiveAndExecute(this, this.session, this.consumer);this.lastMessageSucceeded = true;return messageReceived; }//这里就是使用cacheLevel的地方,由于需要动态selector,所以需要每次重新生成consumer,//当cacheLevel<3的时候,this.consumer会为nullprivate void initResourcesIfNecessary() throws JMSException { if (DefaultMessageListenerContainer.this.getCacheLevel() <= 1) { updateRecoveryMarker(); } else { if ((this.session == null) && (DefaultMessageListenerContainer.this.getCacheLevel() >= 2)) { updateRecoveryMarker(); this.session = DefaultMessageListenerContainer.this.createSession(DefaultMessageListenerContainer.this.getSharedConnection()); } if ((this.consumer == null) && (DefaultMessageListenerContainer.this.getCacheLevel() >= 3)) this.consumer = DefaultMessageListenerContainer.this.createListenerConsumer(this.session); } }//在这个方法中可以发现当传入consumer为null时,会生成一个新的consumerprotected boolean doReceiveAndExecute(Object invoker, Session session, MessageConsumer consumer, TransactionStatus status) throws JMSException { Connection conToClose = null; Session sessionToClose = null; MessageConsumer consumerToClose = null; try { Session sessionToUse = session; boolean transactional = false; if (sessionToUse == null) { sessionToUse = ConnectionFactoryUtils.doGetTransactionalSession( getConnectionFactory(), this.transactionalResourceFactory, true); transactional = sessionToUse != null; } if (sessionToUse == null) { Connection conToUse = null; if (sharedConnectionEnabled()) { conToUse = getSharedConnection(); } else { conToUse = createConnection(); conToClose = conToUse; conToUse.start(); } sessionToUse = createSession(conToUse); sessionToClose = sessionToUse; } MessageConsumer consumerToUse = consumer; if (consumerToUse == null) { consumerToUse = createListenerConsumer(sessionToUse); consumerToClose = consumerToUse; } Message message = receiveMessage(consumerToUse); if (message != null) { if (this.logger.isDebugEnabled()) { this.logger.debug("Received message of type [" + message.getClass() + "] from consumer [" + consumerToUse + "] of " + ((transactional) ? "transactional " : "") + "session [" + sessionToUse + "]"); } messageReceived(invoker, sessionToUse); boolean exposeResource = (!(transactional)) && (isExposeListenerSession()) && (!(TransactionSynchronizationManager.hasResource(getConnectionFactory()))); if (exposeResource) TransactionSynchronizationManager.bindResource( getConnectionFactory(), new LocallyExposedJmsResourceHolder(sessionToUse)); try { doExecuteListener(sessionToUse, message); } catch (Throwable ex) { if (status != null) { if (this.logger.isDebugEnabled()) { this.logger.debug("Rolling back transaction because of listener exception thrown: " + ex); } status.setRollbackOnly(); } handleListenerException(ex); if (ex instanceof JMSException) throw ((JMSException)ex); } finally { if (exposeResource) TransactionSynchronizationManager.unbindResource(getConnectionFactory()); } return true; } if (this.logger.isTraceEnabled()) { this.logger.trace("Consumer [" + consumerToUse + "] of " + ((transactional) ? "transactional " : "") + "session [" + sessionToUse + "] did not receive a message"); } noMessageReceived(invoker, sessionToUse); return false; } finally { JmsUtils.closeMessageConsumer(consumerToClose); JmsUtils.closeSession(sessionToClose); ConnectionFactoryUtils.releaseConnection(conToClose, getConnectionFactory(), true); } }
?
?