首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 软件管理 > 软件架构设计 >

JMS生产者消费者方式收发通用类

2012-07-24 
JMS生产者消费者模式收发通用类jms提供者为ActiveMQ?import java.util.Mapimport java.util.UUIDimport

JMS生产者消费者模式收发通用类

jms提供者为ActiveMQ

?

import java.util.Map;import java.util.UUID;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.Session;import javax.jms.TextMessage;import org.apache.activemq.command.ActiveMQQueue;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.jms.core.JmsTemplate;import org.springframework.jms.core.MessageCreator;import org.springframework.stereotype.Component;/** * mq通用类 *  * @author Fu Wei *  */@Componentpublic class ActiveMQQueueCommon {private static final Logger LOG = LoggerFactory.getLogger(ActiveMQQueueCommon.class);@Autowiredprivate JmsTemplate jmsTemplate;/** * 异步发送 不支持特定消息 *  * @param reqQueue * @param text */public void asyncSend(ActiveMQQueue reqQueue, final String text) {LOG.debug("发送的XML文内容:{}", text);final String correlationId = UUID.randomUUID().toString();jmsTemplate.send(reqQueue, new MessageCreator() {public Message createMessage(Session session) throws JMSException {TextMessage msg = session.createTextMessage(text);msg.setJMSCorrelationID(correlationId);return msg;}});}/** * 异步发送,关联消息id *  * @param reqQueue * @param text * @param propertyName * @param propertyValue 支持一个特定消息 */public void asyncSend(ActiveMQQueue reqQueue, final String text, final String propertyName,        final String propertyValue) {LOG.debug("发送的XML文内容:{}", text);final String correlationId = UUID.randomUUID().toString();jmsTemplate.send(reqQueue, new MessageCreator() {public Message createMessage(Session session) throws JMSException {TextMessage msg = session.createTextMessage(text);msg.setJMSCorrelationID(correlationId);msg.setStringProperty(propertyName, propertyValue);return msg;}});}/** * 异步发送,关联消息id *  * @param reqQueue * @param text * @param propertyName * @param propertyMap 选择器参数 */public void asyncSend(ActiveMQQueue reqQueue, final String text, final String propertyName,        final Map<String, String> propertyMap) {LOG.debug("发送的XML文内容:{}", text);final String correlationId = UUID.randomUUID().toString();jmsTemplate.send(reqQueue, new MessageCreator() {public Message createMessage(Session session) throws JMSException {TextMessage msg = session.createTextMessage(text);msg.setJMSCorrelationID(correlationId);if (propertyMap != null && propertyMap.size() > 0) {for (Map.Entry<String, String> map : propertyMap.entrySet()) {msg.setStringProperty(map.getKey(), map.getValue());}}return msg;}});}/** * 同步发送,不带消息特定属性 *  * @param reqQueue * @param resQueue * @param messText * @param timeout * @return * @throws JMSException */public String syncSend(ActiveMQQueue reqQueue, final ActiveMQQueue resQueue, final String messText, long timeout) {return syncSend(reqQueue, resQueue, messText, timeout, null);}/** * 同步发送,带消息特定属性 *  * @param reqQueue * @param resQueue * @param messText * @param timeout * @param propertyName * @param propertyValue * @return * @throws JMSException */public String syncSend(ActiveMQQueue reqQueue, final ActiveMQQueue resQueue, final String messText, long timeout,        final Map<String, String> propertyMap) {LOG.debug("转发的消息:{}, 超时时间:{}", messText, timeout);final String correlationId = UUID.randomUUID().toString();jmsTemplate.send(reqQueue, new MessageCreator() {public Message createMessage(Session session) throws JMSException {TextMessage msg = session.createTextMessage(messText);msg.setJMSReplyTo(resQueue);msg.setJMSCorrelationID(correlationId);// 添加消息特定属性if (propertyMap != null && propertyMap.size() > 0) {for (Map.Entry<String, String> map : propertyMap.entrySet()) {msg.setStringProperty(map.getKey(), map.getValue());}}return msg;}});jmsTemplate.setReceiveTimeout(timeout * 1000);TextMessage recvMsg = (TextMessage) jmsTemplate.receiveSelected(resQueue, "JMSCorrelationID = '"        + correlationId + "'");String recvMessText = null;try {recvMessText = recvMsg.getText();} catch (JMSException e) {LOG.error("jms错误", e);}LOG.debug("propertyMap: {}, 返回的信息:{}", propertyMap, recvMessText);return recvMessText;}}
?

?

热点排行