首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 网络技术 > 网络基础 >

动态设立Spring DefaultMessageListenerContainer 的messageSelector

2013-08-01 
动态设置Spring DefaultMessageListenerContainer 的messageSelectorSpring JMS可以帮助开发人员快速的使

动态设置Spring DefaultMessageListenerContainer 的messageSelector

Spring JMS可以帮助开发人员快速的使用MQ的发送与接收,在异步接收方面,Spring 提供了MessageListenerContainer的容器接收消息。通过研究源码发现DefaultMessageListenerContainer是支持动态改变messageSelector的。在DefaultMessageListenerContainer 中有个cacheLevel的属性默认是4,把它改动到2或1或0,数字分表代表

public static final int CACHE_NONE = 0;
public static final int CACHE_CONNECTION = 1;
public static final int CACHE_SESSION = 2;
public static final int CACHE_CONSUMER = 3;
public static final int CACHE_AUTO = 4;

在设置完cacheLevel后就可以动态设置messageSelector,Container就能用上最新的selector了。

Spring配置如下

<bean id="messageListenerContainer"        ref="jmsConnectionFactory" />          <property name="destination" ref="receiverQueue" />          <property name="messageListener" ref="jmsReceiver" />          <property name="concurrentConsumers" value="10" />                   <property name="messageSelector" value="CLIENT='DEMO'" />          <property name="cacheLevel" value="2"/>    </bean> 

?修改messageSelector代码如下

DefaultMessageListenerContainer messageListenerContainer = (DefaultMessageListenerContainer) ac.getBean("messageListenerContainer");messageListenerContainer.setMessageSelector("CLIENT='DEMO2'");

?

源码分析:

//DefaultMessageListenerContainer类中, 每次收消息都会call的方法private boolean invokeListener() throws JMSException {initResourcesIfNecessary();boolean messageReceived = DefaultMessageListenerContainer.this.receiveAndExecute(this, this.session, this.consumer);this.lastMessageSucceeded = true;return messageReceived; }//这里就是使用cacheLevel的地方,由于需要动态selector,所以需要每次重新生成consumer,//当cacheLevel<3的时候,this.consumer会为nullprivate void initResourcesIfNecessary() throws JMSException {       if (DefaultMessageListenerContainer.this.getCacheLevel() <= 1) {         updateRecoveryMarker();       }       else {         if ((this.session == null) && (DefaultMessageListenerContainer.this.getCacheLevel() >= 2)) {           updateRecoveryMarker();           this.session = DefaultMessageListenerContainer.this.createSession(DefaultMessageListenerContainer.this.getSharedConnection());         }         if ((this.consumer == null) && (DefaultMessageListenerContainer.this.getCacheLevel() >= 3))           this.consumer = DefaultMessageListenerContainer.this.createListenerConsumer(this.session);       }     }//在这个方法中可以发现当传入consumer为null时,会生成一个新的consumerprotected boolean doReceiveAndExecute(Object invoker, Session session, MessageConsumer consumer, TransactionStatus status)     throws JMSException   {     Connection conToClose = null;     Session sessionToClose = null;     MessageConsumer consumerToClose = null;     try {       Session sessionToUse = session;       boolean transactional = false;       if (sessionToUse == null) {         sessionToUse = ConnectionFactoryUtils.doGetTransactionalSession(           getConnectionFactory(), this.transactionalResourceFactory, true);         transactional = sessionToUse != null;       }       if (sessionToUse == null) {         Connection conToUse = null;         if (sharedConnectionEnabled()) {           conToUse = getSharedConnection();         }         else {           conToUse = createConnection();           conToClose = conToUse;           conToUse.start();         }         sessionToUse = createSession(conToUse);         sessionToClose = sessionToUse;       }       MessageConsumer consumerToUse = consumer;       if (consumerToUse == null) {         consumerToUse = createListenerConsumer(sessionToUse);         consumerToClose = consumerToUse;       }       Message message = receiveMessage(consumerToUse);       if (message != null) {         if (this.logger.isDebugEnabled()) {           this.logger.debug("Received message of type [" + message.getClass() + "] from consumer [" +              consumerToUse + "] of " + ((transactional) ? "transactional " : "") + "session [" +              sessionToUse + "]");         }         messageReceived(invoker, sessionToUse);         boolean exposeResource = (!(transactional)) && (isExposeListenerSession()) &&            (!(TransactionSynchronizationManager.hasResource(getConnectionFactory())));         if (exposeResource)           TransactionSynchronizationManager.bindResource(             getConnectionFactory(), new LocallyExposedJmsResourceHolder(sessionToUse));         try         {           doExecuteListener(sessionToUse, message);         }         catch (Throwable ex) {           if (status != null) {             if (this.logger.isDebugEnabled()) {               this.logger.debug("Rolling back transaction because of listener exception thrown: " + ex);             }             status.setRollbackOnly();           }           handleListenerException(ex);            if (ex instanceof JMSException)             throw ((JMSException)ex);         }         finally         {           if (exposeResource)             TransactionSynchronizationManager.unbindResource(getConnectionFactory());         }         return true;       }        if (this.logger.isTraceEnabled()) {         this.logger.trace("Consumer [" + consumerToUse + "] of " + ((transactional) ? "transactional " : "") +            "session [" + sessionToUse + "] did not receive a message");       }       noMessageReceived(invoker, sessionToUse);       return false;     }     finally     {       JmsUtils.closeMessageConsumer(consumerToClose);       JmsUtils.closeSession(sessionToClose);       ConnectionFactoryUtils.releaseConnection(conToClose, getConnectionFactory(), true);     }   }

?

?

热点排行