JMS调用IBM MQ 监听方式的点对点模式
前面三篇,第一篇讲了安装IBM MQ时遇到的一些问题。第二篇讲了点对点模式的调用。第三篇讲了发布订阅模式。本篇说一下监听模式。
?
监听模式只是在消费者端监听就可以了。对于消息发布者,代码不用做改动。改动的代码用<span>标签标注(代码不能修改颜色?):
package test2;// SCCSID "@(#) MQMBID sn=p000-L120604 su=_H-IvIK4nEeGko6IWl3MDhA pn=MQJavaSamples/jms/JmsConsumer.java"/* * <copyright * notice="lm-source-program" * pids="5724-H72,5655-R36,5655-L82,5724-L26," * years="2008,2012" * crc="39457954" > * Licensed Materials - Property of IBM * * 5724-H72,5655-R36,5655-L82,5724-L26, * * (C) Copyright IBM Corp. 2008, 2012 All Rights Reserved. * * US Government Users Restricted Rights - Use, duplication or * disclosure restricted by GSA ADP Schedule Contract with * IBM Corp. * </copyright> */import javax.jms.Connection;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageConsumer;import javax.jms.MessageListener;import javax.jms.Session;import javax.jms.TextMessage;import com.ibm.msg.client.jms.JmsConnectionFactory;import com.ibm.msg.client.jms.JmsFactoryFactory;import com.ibm.msg.client.wmq.WMQConstants;/** * A JMS consumer (receiver or subscriber) application that receives a message from the named * destination (queue or topic). * * Tip: A subscriber application must be started before the publisher application. * * Notes: * * API type: IBM JMS API (v1.1, unified domain) * * Messaging domain: Point-to-point or Publish-Subscribe * * Provider type: WebSphere MQ * * Connection mode: Client connection * * JNDI in use: No * * Usage: * * JmsConsumer -m queueManagerName -d destinationName [-h host -p port -l channel] * * for example: * * JmsConsumer -m QM1 -d Q1 * * JmsConsumer -m QM1 -d topic://foo -h localhost -p 1414 */public class JmsConsumer { private static String host = "localhost"; private static int port = 1414; private static String channel = "SYSTEM.DEF.SVRCONN"; private static String queueManagerName = null; private static String destinationName = null; private static boolean isTopic = true; private static int timeout = 15000; // in ms or 15 seconds // System exit status value (assume unset value to be 1) private static int status = 1; /** * Main method * * @param args */ public static void main(String[] args) { // Parse the argumentsargs = new String[]{"-m","QMTest", "-d","testQueue"}; parseArgs(args); // Variables Connection connection = null; Session session = null; Destination destination = null; MessageConsumer consumer = null; try { // Create a connection factory JmsFactoryFactory ff = JmsFactoryFactory.getInstance(WMQConstants.WMQ_PROVIDER); JmsConnectionFactory cf = ff.createConnectionFactory(); // Set the properties cf.setStringProperty(WMQConstants.WMQ_HOST_NAME, host); cf.setIntProperty(WMQConstants.WMQ_PORT, port); cf.setStringProperty(WMQConstants.WMQ_CHANNEL, channel); cf.setIntProperty(WMQConstants.WMQ_CONNECTION_MODE, WMQConstants.WMQ_CM_CLIENT); cf.setStringProperty(WMQConstants.WMQ_QUEUE_MANAGER, queueManagerName); // Create JMS objects connection = cf.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); if (isTopic) { destination = session.createTopic(destinationName); } else { destination = session.createQueue(destinationName); } consumer = session.createConsumer(destination); // Start the connection connection.start(); consumer.setMessageListener(new MessageListener(){ public void onMessage(Message m) { //如果想收到完整的消息“ System.out.println(m); //如果想收到消息内容,执行下面的代码 TextMessage textMsg = (TextMessage) m; try { System.out.println(textMsg.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); // And, receive the message //在指定的超市时间内接收下一条消息// Message message = consumer.receive(timeout);// if (message != null) {// System.err.println("Received message:\n" + message);//// System.out.println("Received message:\n" + message);// }// else {// System.out.println("No message received!\n");// recordFailure(null);// } recordSuccess(); } catch (JMSException jmsex) { recordFailure(jmsex); } finally { if (consumer != null) { try { consumer.close(); } catch (JMSException jmsex) { System.out.println("Consumer could not be closed."); recordFailure(jmsex); } } if (session != null) { try { session.close(); } catch (JMSException jmsex) { System.out.println("Session could not be closed."); recordFailure(jmsex); } } if (connection != null) { try { connection.close(); } catch (JMSException jmsex) { System.out.println("Connection could not be closed."); recordFailure(jmsex); } } } System.exit(status); return; } // end main() /** * Process a JMSException and any associated inner exceptions. * * @param jmsex */ private static void processJMSException(JMSException jmsex) { System.out.println(jmsex); Throwable innerException = jmsex.getLinkedException(); if (innerException != null) { System.out.println("Inner exception(s):"); } while (innerException != null) { System.out.println(innerException); innerException = innerException.getCause(); } return; } /** * Record this run as successful. */ private static void recordSuccess() { System.out.println("SUCCESS"); status = 0; return; } /** * Record this run as failure. * * @param ex */ private static void recordFailure(Exception ex) { if (ex != null) { if (ex instanceof JMSException) { processJMSException((JMSException) ex); } else { System.out.println(ex); } } System.out.println("FAILURE"); status = -1; return; } /** * Parse user supplied arguments. * * @param args */ private static void parseArgs(String[] args) { try { int length = args.length; if (length == 0) { throw new IllegalArgumentException("No arguments! Mandatory arguments must be specified."); } if ((length % 2) != 0) { throw new IllegalArgumentException("Incorrect number of arguments!"); } int i = 0; while (i < length) { if ((args[i]).charAt(0) != '-') { throw new IllegalArgumentException("Expected a '-' character next: " + args[i]); } char opt = (args[i]).toLowerCase().charAt(1); switch (opt) { case 'h' : host = args[++i]; break; case 'p' : port = Integer.parseInt(args[++i]); break; case 'l' : channel = args[++i]; break; case 'm' : queueManagerName = args[++i]; break; case 'd' : destinationName = args[++i]; break; default : { throw new IllegalArgumentException("Unknown argument: " + opt); } } ++i; } if (queueManagerName == null) { throw new IllegalArgumentException("A queueManager name must be specified."); } if (destinationName == null) { throw new IllegalArgumentException("A destination name must be specified."); } // Whether the destination is a queue or a topic. Apply a simple check. if (destinationName.startsWith("topic://")) { isTopic = true; } else { // Otherwise, let's assume it is a queue. isTopic = false; } } catch (Exception e) { System.out.println(e.getMessage()); printUsage(); System.exit(-1); } return; } /** * Display usage help. */ private static void printUsage() { System.out.println("\nUsage:"); System.out .println("JmsConsumer -m queueManagerName -d destinationName [-h host -p port -l channel]"); return; }} // end class
?