首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 软件管理 > 软件架构设计 >

activemq跟spring结合使用

2012-11-01 
activemq和spring结合使用activemq上一次已经讲解了安装、启动、数据库的持久化配置等。?这次主要记录下,如何

activemq和spring结合使用

activemq上一次已经讲解了安装、启动、数据库的持久化配置等。

?

这次主要记录下,如何跟spring结合使用,如何发送消息以及进行消费。

?

消息产生者向JMS发送消息的步骤


(1)创建连接使用的工厂类JMS ConnectionFactory
(2)使用管理对象JMS ConnectionFactory建立连接Connection
(3)使用连接Connection 建立会话Session
(4)使用会话Session和管理对象Destination创建消息生产者MessageSender
(5)使用消息生产者MessageSender发送消息


消息消费者从JMS接受消息的步骤


(1)创建连接使用的工厂类JMS ConnectionFactory
(2)使用管理对象JMS ConnectionFactory建立连接Connection
(3)使用连接Connection 建立会话Session
(4)使用会话Session和管理对象Destination创建消息消费者MessageReceiver
(5)使用消息消费者MessageReceiver接受消息,需要用setMessageListener将MessageListener接口绑定到MessageReceiver


?

?

?1.下面是使用了JTA配置、以及mq进行数据库持久化的spring相关配置。

?<!--建立mqfactory--><bean id="xaMQFactory" value="tcp://localhost:61616" /> </bean><!--建立connectionfactory,用jta的事务进行管理--><bean id="connectionFactory" init-method="init" destroy-method="close"> <property name="uniqueResourceName"> <value>QUEUE_BROKER</value> </property> <property name="xaConnectionFactory"> <ref bean="xaMQFactory"/> </property> </bean><!--建立发送队列,以queuem模式--><bean id="queueDestination" value="solrMessageQueue"/> </bean><!--jdbcTemplate 用于发送消息,由于是使用数据库持久化-->?<bean id="jdbcTemplate" init-method="init" destroy-method="close">??????? <property name="uniqueResourceName">??????????? <value>ZZGRID_MAIN_JDBC_RESOURCE</value>??????? </property>??<property name="xaDataSourceClassName" value="oracle.jdbc.xa.client.OracleXADataSource" />??????? <property name="xaProperties">??????????? <props>??????????????? <prop key="user">username</prop>??????????????? <prop key="password">password</prop>??????????????? <prop key="URL" >jdbc:oracle:thin:@localhost:1521:oracle</prop>??????????? </props>??????? </property>???</bean>

?

2.不使用JTA,只进行数据库持久化的Spring配置

<!--建立connectionfactory-->?<bean id="connectionFactory" value="tcp://localhost:61616"/> ??</bean> <!--建立发送队列,以queue模式--><bean id="queueDestination" value="solrMessageQueue"/>  </bean><!--jdbcTemplate 用于发送消息,由于是使用数据库持久化--> <bean id="jdbcTemplate" destroy-method="close">??<property name="driverClass"???value="oracle.jdbc.driver.OracleDriver" />??<property name="jdbcUrl"?value="jdbc:oracle:thin:@localhost:1521:oracle"/>??<property name="user" value="username" />??<property name="password" value="password" />??<property name="acquireIncrement" value="5" />??<property name="acquireRetryAttempts" value="30" />??<property name="acquireRetryDelay" value="1000" />??<property name="idleConnectionTestPeriod" value="60" />??<property name="testConnectionOnCheckin" value="true" />??<property name="automaticTestTable" value="C3P0Table" /> ?</bean>

?

3.使用MQ默认的硬盘读写,结合spring的配置

?

<!--建立connectionfactory--> <bean id="connectionFactory" value="tcp://localhost:61616"/>   </bean> <!--建立发送队列,以queue模式--><bean id="queueDestination" value="solrMessageQueue"/>  </bean><!--建立发送队列,以queue模式--><bean id="queueDestination" value="solrMessageQueue"/>  </bean><!--jdbcTemplate 用于发送消息--> <bean id="jdbcTemplate" ref="connectionFactory"/>     </bean>

?

?

?MQ消息体(body)――JMS中定义了5种消息体:ByteMessage、MapMessage、ObjectMessage、StreamMessage和TextMessage。这里演示相对最复杂的 ObjectMessage。

?

消息发送主体

package xxxx;import javax.jms.ConnectionFactory;import javax.jms.Destination;import org.springframework.beans.BeansException;import org.springframework.context.ApplicationContext;import org.springframework.context.ApplicationContextAware;import org.springframework.jms.core.JmsTemplate;import org.springframework.stereotype.Component;import com.xxx.jms.SolrMessage;import com.xxx.jms.SolrMessageConvert;@Component("activeMQMessageSender")public class ActiveMQMessageSender implements ApplicationContextAware{private ApplicationContext appContext;@Overridepublic void setApplicationContext(ApplicationContext appContext)throws BeansException {this.appContext=appContext;}public void send(SolrMessage solrMessage) {String isSenderSolrMsg = "true";if (isSenderSolrMsg != null && Boolean.valueOf(isSenderSolrMsg)){JmsTemplate template = new JmsTemplate();template.setConnectionFactory((ConnectionFactory) appContext.getBean("connectionFactory"));template.setDefaultDestination((Destination) appContext.getBean("queueDestination"));template.setSessionTransacted(true);//如果不使用事务,这里修改成falsetemplate.setMessageConverter(new SolrMessageConvert());template.convertAndSend(solrMessage);}}}

?

消息对象
?

package com.xxx.jms;public class SolrMessage {private Long id;private String type;private String mode;private String idCardNo;public Long getId() {return id;}public void setId(Long id) {this.id = id;}public String getType() {return type;}public void setType(String type) {this.type = type;}public String getMode() {return mode;}public void setMode(String mode) {this.mode = mode;}public String getIdCardNo() {return idCardNo;}public void setIdCardNo(String idCardNo) {this.idCardNo = idCardNo;}public SolrMessage(String idCardNo, String type) {this.idCardNo = idCardNo;this.mode = Mode.DELETE.toString();this.type = type;}public SolrMessage(Long id, String type, String mode) {this.id = id;this.type = type;this.mode = mode;}public SolrMessage() {}}

?msg消息和send消息的转化类

package com.xxx.jms;import javax.jms.JMSException;import javax.jms.MapMessage;import javax.jms.Message;import javax.jms.Session;import org.springframework.jms.support.converter.MessageConversionException;import org.springframework.jms.support.converter.MessageConverter;public class SolrMessageConvert implements MessageConverter {@Overridepublic Object fromMessage(Message message) throws JMSException,MessageConversionException {SolrMessage solrMessage = new SolrMessage();if (message != null) {MapMessage msg = (MapMessage) message;solrMessage.setId(msg.getLong("id"));solrMessage.setIdCardNo(msg.getString("idCardNo"));solrMessage.setMode(msg.getString("mode"));solrMessage.setType(msg.getString("type"));}return solrMessage;}@Overridepublic Message toMessage(Object object, Session session)throws JMSException, MessageConversionException {MapMessage mapMessage = session.createMapMessage();if (null != object) {SolrMessage solrMessage = (SolrMessage) object;if (null != solrMessage.getId()) {mapMessage.setLong("id", solrMessage.getId());}if (null != solrMessage.getType()) {mapMessage.setString("type", solrMessage.getType());}if (null != solrMessage.getMode()) {mapMessage.setString("mode", solrMessage.getMode());}if (null != solrMessage.getIdCardNo()) {mapMessage.setString("idCardNo", solrMessage.getIdCardNo());}}return mapMessage;}}

?

?

mq消息的消费也是类似的,主要涉及的spring配置如下

?

  <!--消息转化类,用于send消息和mqmsg消息的转化--><bean id="messageConvert" />  <!--消息的消费--><bean id="messageListener" value="onRegister"/>            <property name="messageConverter" ref="messageConvert"/>    </bean>   <!--接收mq消息的监听器,用于检测mq消息的消费-->     <bean id="queueListenerContainer" ref="connectionFactory"/>            <property name="destination" ref="queueDestination"/>            <property name="messageListener" ref="messageListener"/>    </bean>

?

消息消费类(接收类)

package com.xxx;import org.springframework.beans.factory.annotation.Autowired;import com.xxx.solr.domain.SolrMessage;public class SolrRegisterListener {public void onRegister(SolrMessage solrMessage){System.out.println(solrMessage.getId() + ""+ solrMessage.getMode() + ""+ solrMessage.getType() + ""+ solrMessage.getIdCardNo());/**具体的业务逻辑 **对接收到的消息进行消费**/}}

?

这个编辑器有些不好用,预览模式和编辑模式以及正式的页面效果都不完全一致。。。。

?

?

1 楼 xosadan 2011-06-29   环境补充下  
MQ版本5.5
Spring2.3
JDK1.6 2 楼 zhangbenben 2011-06-30   activemq in action  有吗  发来一本 3 楼 xosadan 2011-06-30   如你所愿 附上 mq in action 4 楼 yanweiqi 2011-10-12   把源码上传上就好了。 5 楼 xz1367 2012-06-08   传说中的小吴?

热点排行