JMS生产者消费者模式收发通用类
jms提供者为ActiveMQ
?
import java.util.Map;import java.util.UUID;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.Session;import javax.jms.TextMessage;import org.apache.activemq.command.ActiveMQQueue;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.jms.core.JmsTemplate;import org.springframework.jms.core.MessageCreator;import org.springframework.stereotype.Component;/** * mq通用类 * * @author Fu Wei * */@Componentpublic class ActiveMQQueueCommon {private static final Logger LOG = LoggerFactory.getLogger(ActiveMQQueueCommon.class);@Autowiredprivate JmsTemplate jmsTemplate;/** * 异步发送 不支持特定消息 * * @param reqQueue * @param text */public void asyncSend(ActiveMQQueue reqQueue, final String text) {LOG.debug("发送的XML文内容:{}", text);final String correlationId = UUID.randomUUID().toString();jmsTemplate.send(reqQueue, new MessageCreator() {public Message createMessage(Session session) throws JMSException {TextMessage msg = session.createTextMessage(text);msg.setJMSCorrelationID(correlationId);return msg;}});}/** * 异步发送,关联消息id * * @param reqQueue * @param text * @param propertyName * @param propertyValue 支持一个特定消息 */public void asyncSend(ActiveMQQueue reqQueue, final String text, final String propertyName, final String propertyValue) {LOG.debug("发送的XML文内容:{}", text);final String correlationId = UUID.randomUUID().toString();jmsTemplate.send(reqQueue, new MessageCreator() {public Message createMessage(Session session) throws JMSException {TextMessage msg = session.createTextMessage(text);msg.setJMSCorrelationID(correlationId);msg.setStringProperty(propertyName, propertyValue);return msg;}});}/** * 异步发送,关联消息id * * @param reqQueue * @param text * @param propertyName * @param propertyMap 选择器参数 */public void asyncSend(ActiveMQQueue reqQueue, final String text, final String propertyName, final Map<String, String> propertyMap) {LOG.debug("发送的XML文内容:{}", text);final String correlationId = UUID.randomUUID().toString();jmsTemplate.send(reqQueue, new MessageCreator() {public Message createMessage(Session session) throws JMSException {TextMessage msg = session.createTextMessage(text);msg.setJMSCorrelationID(correlationId);if (propertyMap != null && propertyMap.size() > 0) {for (Map.Entry<String, String> map : propertyMap.entrySet()) {msg.setStringProperty(map.getKey(), map.getValue());}}return msg;}});}/** * 同步发送,不带消息特定属性 * * @param reqQueue * @param resQueue * @param messText * @param timeout * @return * @throws JMSException */public String syncSend(ActiveMQQueue reqQueue, final ActiveMQQueue resQueue, final String messText, long timeout) {return syncSend(reqQueue, resQueue, messText, timeout, null);}/** * 同步发送,带消息特定属性 * * @param reqQueue * @param resQueue * @param messText * @param timeout * @param propertyName * @param propertyValue * @return * @throws JMSException */public String syncSend(ActiveMQQueue reqQueue, final ActiveMQQueue resQueue, final String messText, long timeout, final Map<String, String> propertyMap) {LOG.debug("转发的消息:{}, 超时时间:{}", messText, timeout);final String correlationId = UUID.randomUUID().toString();jmsTemplate.send(reqQueue, new MessageCreator() {public Message createMessage(Session session) throws JMSException {TextMessage msg = session.createTextMessage(messText);msg.setJMSReplyTo(resQueue);msg.setJMSCorrelationID(correlationId);// 添加消息特定属性if (propertyMap != null && propertyMap.size() > 0) {for (Map.Entry<String, String> map : propertyMap.entrySet()) {msg.setStringProperty(map.getKey(), map.getValue());}}return msg;}});jmsTemplate.setReceiveTimeout(timeout * 1000);TextMessage recvMsg = (TextMessage) jmsTemplate.receiveSelected(resQueue, "JMSCorrelationID = '" + correlationId + "'");String recvMessText = null;try {recvMessText = recvMsg.getText();} catch (JMSException e) {LOG.error("jms错误", e);}LOG.debug("propertyMap: {}, 返回的信息:{}", propertyMap, recvMessText);return recvMessText;}}?
?