向websphere mq queue里put和get消息
??连接Queue Manager的工具类:
package com.quest.mq;import java.io.IOException;import java.util.Hashtable;import com.ibm.mq.MQException;import com.ibm.mq.MQQueueManager;import com.ibm.mq.pcf.PCFMessage;import com.ibm.mq.pcf.PCFMessageAgent;public class MQTool {public String queueManagerName = "QM";public String hostname = "localhost";public int port = 1411;public String channel = "SYSTEM.DEF.SVRCONN";protected Hashtable<String, Object> properties = new Hashtable<String, Object>();public MQTool(String queueManagerName, String hostname, int port, String channel){this.queueManagerName = queueManagerName;this.hostname = hostname;this.port = port;this.channel = channel;try {queueManager = new MQQueueManager(queueManagerName, getConnectionProperties());agent = new PCFMessageAgent();agent.connect(queueManager);} catch (Exception e){e.printStackTrace();}}public PCFMessageAgent agent = null;public MQQueueManager queueManager = null;public MQQueueManager getQueueManager() {return queueManager;}public final boolean isNullOrEmpty(String input){boolean result = false;if(input == null){result = true;}else if(input.length() == 0){result = true;}return result;}/** * Construct MQ queue manager connection properties * @param hostname * @param port * @param channel * @return */private Hashtable<String, Object> getConnectionProperties(){properties.put("hostname", hostname);properties.put("port", port);properties.put("channel", channel);System.out.println("Connection informations: " + properties);System.out.println("Connect QM :" + queueManagerName); return properties;}/** * Get access to a command agent, based on a queue manager name * @param queueManagerName the queue manager name * @return */public PCFMessageAgent getAgent() {return agent;}protected PCFMessage[] pcfInquire(PCFMessage queuePCF) throws MQException, IOException{getAgent().connect(queueManager);PCFMessage[] responses = getAgent().send(queuePCF);getAgent().disconnect();return responses;}}?
?
?对一条Queue的放入消息,取出消息:
import java.io.IOException;import com.ibm.mq.MQException;import com.ibm.mq.MQGetMessageOptions;import com.ibm.mq.MQMessage;import com.ibm.mq.MQQueue;import com.ibm.mq.MQQueueManager;import com.ibm.mq.MQTopic;import com.ibm.mq.constants.CMQC;import com.ibm.mq.constants.MQConstants;import com.quest.mq.MQTool;public class MQTest {/** * @param args * @throws MQException * @throws IOException * @throws InterruptedException */public static void main(String[] args) throws MQException, IOException, InterruptedException {String queueManagerName = "QM1";String hostname = "127.0.0.1";int port = 1421;String channel = "SYSTEM.DEF.SVRCONN";MQTool mqTool = new MQTool(queueManagerName, hostname, port, channel);MQQueueManager queueManager = mqTool.getQueueManager();//放入消息MQQueue queue1 = queueManager.accessQueue("apple2", CMQC.MQOO_OUTPUT);MQMessage message1 = new MQMessage();message1.writeString("33333");queue1.put(message1);//不消费消息MQQueue queue2 = queueManager.accessQueue("apple2", MQConstants.MQOO_BROWSE | MQConstants.MQOO_INQUIRE);MQGetMessageOptions getMsgOption = new MQGetMessageOptions();getMsgOption.options = MQConstants.MQGMO_BROWSE_NEXT;for(int i=0; i< queue2.getCurrentDepth(); i++){MQMessage message = new MQMessage();queue2.get(message, getMsgOption);System.out.println(message.readStringOfCharLength(message.getDataLength()));}//消费消息MQQueue queue3 = queueManager.accessQueue("apple2", CMQC.MQOO_INPUT_AS_Q_DEF | MQConstants.MQOO_INQUIRE);int depth = queue3.getCurrentDepth();for(int i=0; i < depth; i++){MQMessage message = new MQMessage();queue3.get(message);System.out.println(message.readStringOfCharLength(message.getDataLength())+ "----");System.out.println("===");}}}
?
?