activemq和spring结合使用
activemq上一次已经讲解了安装、启动、数据库的持久化配置等。
?
这次主要记录下,如何跟spring结合使用,如何发送消息以及进行消费。
?
消息产生者向JMS发送消息的步骤
(1)创建连接使用的工厂类JMS ConnectionFactory
(2)使用管理对象JMS ConnectionFactory建立连接Connection
(3)使用连接Connection 建立会话Session
(4)使用会话Session和管理对象Destination创建消息生产者MessageSender
(5)使用消息生产者MessageSender发送消息
消息消费者从JMS接受消息的步骤
(1)创建连接使用的工厂类JMS ConnectionFactory
(2)使用管理对象JMS ConnectionFactory建立连接Connection
(3)使用连接Connection 建立会话Session
(4)使用会话Session和管理对象Destination创建消息消费者MessageReceiver
(5)使用消息消费者MessageReceiver接受消息,需要用setMessageListener将MessageListener接口绑定到MessageReceiver
?
?
?1.下面是使用了JTA配置、以及mq进行数据库持久化的spring相关配置。
?<!--建立mqfactory--><bean id="xaMQFactory" value="tcp://localhost:61616" /> </bean><!--建立connectionfactory,用jta的事务进行管理--><bean id="connectionFactory" init-method="init" destroy-method="close"> <property name="uniqueResourceName"> <value>QUEUE_BROKER</value> </property> <property name="xaConnectionFactory"> <ref bean="xaMQFactory"/> </property> </bean><!--建立发送队列,以queuem模式--><bean id="queueDestination" value="solrMessageQueue"/> </bean><!--jdbcTemplate 用于发送消息,由于是使用数据库持久化-->?<bean id="jdbcTemplate" init-method="init" destroy-method="close">??????? <property name="uniqueResourceName">??????????? <value>ZZGRID_MAIN_JDBC_RESOURCE</value>??????? </property>??<property name="xaDataSourceClassName" value="oracle.jdbc.xa.client.OracleXADataSource" />??????? <property name="xaProperties">??????????? <props>??????????????? <prop key="user">username</prop>??????????????? <prop key="password">password</prop>??????????????? <prop key="URL" >jdbc:oracle:thin:@localhost:1521:oracle</prop>??????????? </props>??????? </property>???</bean>
?
2.不使用JTA,只进行数据库持久化的Spring配置
<!--建立connectionfactory-->?<bean id="connectionFactory" value="tcp://localhost:61616"/> ??</bean> <!--建立发送队列,以queue模式--><bean id="queueDestination" value="solrMessageQueue"/> </bean><!--jdbcTemplate 用于发送消息,由于是使用数据库持久化--> <bean id="jdbcTemplate" destroy-method="close">??<property name="driverClass"???value="oracle.jdbc.driver.OracleDriver" />??<property name="jdbcUrl"?value="jdbc:oracle:thin:@localhost:1521:oracle"/>??<property name="user" value="username" />??<property name="password" value="password" />??<property name="acquireIncrement" value="5" />??<property name="acquireRetryAttempts" value="30" />??<property name="acquireRetryDelay" value="1000" />??<property name="idleConnectionTestPeriod" value="60" />??<property name="testConnectionOnCheckin" value="true" />??<property name="automaticTestTable" value="C3P0Table" /> ?</bean>
?
3.使用MQ默认的硬盘读写,结合spring的配置
?
<!--建立connectionfactory--> <bean id="connectionFactory" value="tcp://localhost:61616"/> </bean> <!--建立发送队列,以queue模式--><bean id="queueDestination" value="solrMessageQueue"/> </bean><!--建立发送队列,以queue模式--><bean id="queueDestination" value="solrMessageQueue"/> </bean><!--jdbcTemplate 用于发送消息--> <bean id="jdbcTemplate" ref="connectionFactory"/> </bean>
?
?
?MQ消息体(body)――JMS中定义了5种消息体:ByteMessage、MapMessage、ObjectMessage、StreamMessage和TextMessage。这里演示相对最复杂的 ObjectMessage。
?
消息发送主体
package xxxx;import javax.jms.ConnectionFactory;import javax.jms.Destination;import org.springframework.beans.BeansException;import org.springframework.context.ApplicationContext;import org.springframework.context.ApplicationContextAware;import org.springframework.jms.core.JmsTemplate;import org.springframework.stereotype.Component;import com.xxx.jms.SolrMessage;import com.xxx.jms.SolrMessageConvert;@Component("activeMQMessageSender")public class ActiveMQMessageSender implements ApplicationContextAware{private ApplicationContext appContext;@Overridepublic void setApplicationContext(ApplicationContext appContext)throws BeansException {this.appContext=appContext;}public void send(SolrMessage solrMessage) {String isSenderSolrMsg = "true";if (isSenderSolrMsg != null && Boolean.valueOf(isSenderSolrMsg)){JmsTemplate template = new JmsTemplate();template.setConnectionFactory((ConnectionFactory) appContext.getBean("connectionFactory"));template.setDefaultDestination((Destination) appContext.getBean("queueDestination"));template.setSessionTransacted(true);//如果不使用事务,这里修改成falsetemplate.setMessageConverter(new SolrMessageConvert());template.convertAndSend(solrMessage);}}}
?
消息对象
?
package com.xxx.jms;public class SolrMessage {private Long id;private String type;private String mode;private String idCardNo;public Long getId() {return id;}public void setId(Long id) {this.id = id;}public String getType() {return type;}public void setType(String type) {this.type = type;}public String getMode() {return mode;}public void setMode(String mode) {this.mode = mode;}public String getIdCardNo() {return idCardNo;}public void setIdCardNo(String idCardNo) {this.idCardNo = idCardNo;}public SolrMessage(String idCardNo, String type) {this.idCardNo = idCardNo;this.mode = Mode.DELETE.toString();this.type = type;}public SolrMessage(Long id, String type, String mode) {this.id = id;this.type = type;this.mode = mode;}public SolrMessage() {}}
?msg消息和send消息的转化类
package com.xxx.jms;import javax.jms.JMSException;import javax.jms.MapMessage;import javax.jms.Message;import javax.jms.Session;import org.springframework.jms.support.converter.MessageConversionException;import org.springframework.jms.support.converter.MessageConverter;public class SolrMessageConvert implements MessageConverter {@Overridepublic Object fromMessage(Message message) throws JMSException,MessageConversionException {SolrMessage solrMessage = new SolrMessage();if (message != null) {MapMessage msg = (MapMessage) message;solrMessage.setId(msg.getLong("id"));solrMessage.setIdCardNo(msg.getString("idCardNo"));solrMessage.setMode(msg.getString("mode"));solrMessage.setType(msg.getString("type"));}return solrMessage;}@Overridepublic Message toMessage(Object object, Session session)throws JMSException, MessageConversionException {MapMessage mapMessage = session.createMapMessage();if (null != object) {SolrMessage solrMessage = (SolrMessage) object;if (null != solrMessage.getId()) {mapMessage.setLong("id", solrMessage.getId());}if (null != solrMessage.getType()) {mapMessage.setString("type", solrMessage.getType());}if (null != solrMessage.getMode()) {mapMessage.setString("mode", solrMessage.getMode());}if (null != solrMessage.getIdCardNo()) {mapMessage.setString("idCardNo", solrMessage.getIdCardNo());}}return mapMessage;}}
?
?
mq消息的消费也是类似的,主要涉及的spring配置如下
?
<!--消息转化类,用于send消息和mqmsg消息的转化--><bean id="messageConvert" /> <!--消息的消费--><bean id="messageListener" value="onRegister"/> <property name="messageConverter" ref="messageConvert"/> </bean> <!--接收mq消息的监听器,用于检测mq消息的消费--> <bean id="queueListenerContainer" ref="connectionFactory"/> <property name="destination" ref="queueDestination"/> <property name="messageListener" ref="messageListener"/> </bean>
?
消息消费类(接收类)
package com.xxx;import org.springframework.beans.factory.annotation.Autowired;import com.xxx.solr.domain.SolrMessage;public class SolrRegisterListener {public void onRegister(SolrMessage solrMessage){System.out.println(solrMessage.getId() + ""+ solrMessage.getMode() + ""+ solrMessage.getType() + ""+ solrMessage.getIdCardNo());/**具体的业务逻辑 **对接收到的消息进行消费**/}}
?
这个编辑器有些不好用,预览模式和编辑模式以及正式的页面效果都不完全一致。。。。
?
?
1 楼 xosadan 2011-06-29 环境补充下