Spring-ActiveMQ的点对点和Topic
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.example.activemq</groupId> <artifactId>activemq-test</artifactId> <version>1.0-SNAPSHOT</version> <packaging>jar</packaging> <name>activemq-test</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <org.springframework.version>3.1.1.RELEASE</org.springframework.version> </properties> <repositories> <repository><id>kxcomm-maven</id><name>Maven kxcomm Repository</name><url>http://122.13.0.56:8088/nexus/content/groups/public/</url><releases><enabled>true</enabled></releases><snapshots><enabled>true</enabled></snapshots></repository> </repositories> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency><groupId>commons-collections</groupId><artifactId>commons-collections</artifactId><version>3.2</version></dependency><dependency><groupId>commons-configuration</groupId><artifactId>commons-configuration</artifactId><version>1.6</version></dependency><dependency><groupId>commons-io</groupId><artifactId>commons-io</artifactId><version>1.3.2</version></dependency><dependency><groupId>commons-logging</groupId><artifactId>commons-logging</artifactId><version>1.1.1</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version></dependency><dependency><groupId>commons-beanutils</groupId><artifactId>commons-beanutils</artifactId><version>1.8.3</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-asm</artifactId><version>${org.springframework.version}</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-jms</artifactId><version>${org.springframework.version}</version></dependency><dependency><groupId>com.davidkarlsen.commonstransaction.spring</groupId><artifactId>commons-transaction-spring</artifactId><version>0.9</version></dependency><dependency><groupId>org.apache.httpcomponents</groupId><artifactId>httpclient</artifactId><version>4.2-beta1</version></dependency><dependency><groupId>fastutil</groupId><artifactId>fastutil</artifactId><version>5.0.9</version></dependency><dependency><groupId>ch.qos.logback</groupId><artifactId>logback-classic</artifactId><version>0.9.27</version><scope>compile</scope></dependency><dependency><groupId>ch.qos.logback</groupId><artifactId>logback-core</artifactId><version>0.9.27</version><scope>compile</scope></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.6.1</version><scope>compile</scope></dependency><dependency><groupId>org.slf4j</groupId><artifactId>jcl-over-slf4j</artifactId><version>1.6.1</version><scope>runtime</scope></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.5</version></dependency><dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-all</artifactId><version>5.8.0</version></dependency> </dependencies></project>
<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-2.5.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.5.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd" default-autowire="byName" default-lazy-init="true"><import resource="activemq-test.xml"/></beans>
<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd"><!-- 创建工厂连接 --><bean id="connectionFactory" value="tcp://localhost:61616" /></bean><bean id="jmsTemplate" ref="connectionFactory" /> <property name="defaultDestination" ref="rantzDestination" /> </bean><!-- Point-to-Point --><!-- activeMQ消息目标 队列 --><bean id="rantzDestination" value="rantz.marketing.queue"></constructor-arg></bean><!-- activeMQ消息目标 主题--> <!--<bean id="rantzDestination" value="rantz.marketing.queue"></constructor-arg>--> <!--</bean>--> <bean id="producer" ref="jmsTemplate" /><property name="destination" ref="rantzDestination" /></bean><bean id="consumer" ref="jmsTemplate" /> </bean> <!-- Point-to-Point End--><!-- Topic --><bean id="topic" autowire="constructor"> <constructor-arg index="0" value="kxcomm.mms.topic" /> </bean> <bean id="control" autowire="constructor"> <constructor-arg index="0" value="kxcomm.mms.control" /> </bean><bean id="myListener" ref="connectionFactory" /> <property name="topic" ref="topic" /> <property name="control" ref="control" /> </bean> <bean id="myPublisher" ref="connectionFactory" /> <property name="topic" ref="topic" /> <property name="control" ref="control" /> </bean><!-- Topic End--></beans>
package activemq.test.model;import java.io.Serializable;public class User implements Serializable{private static final long serialVersionUID = -3098636047897519268L;private String name;private String sex;private int age;public String getName() {return name;}public void setName(String name) {this.name = name;}public String getSex() {return sex;}public void setSex(String sex) {this.sex = sex;}public int getAge() {return age;}public void setAge(int age) {this.age = age;}@Overridepublic String toString() {return "User [name=" + name + ", sex=" + sex + ", age=" + age + "]";}}
package activemq.test.p2p.consumer;import org.springframework.jms.core.JmsTemplate;import activemq.test.model.User;public class MarketingReceiverGatewayImpl {private JmsTemplate jmsTemplate;public JmsTemplate getJmsTemplate() {return jmsTemplate;}public void setJmsTemplate(JmsTemplate jmsTemplate) {this.jmsTemplate = jmsTemplate;}public MarketingReceiverGatewayImpl() {}public void receiveMotorist() throws Exception{User message = (User)jmsTemplate.receiveAndConvert();System.out.println("reviced msg is:" + message.toString());}}
package activemq.test.p2p.consumer;import org.springframework.context.ApplicationContext;import org.springframework.context.support.ClassPathXmlApplicationContext;public class StartConsumer {public static void main(String[] args) {/*开始加载spring配置文件*/ApplicationContext context = new ClassPathXmlApplicationContext("classpath:modules/applicationContext.xml");MarketingReceiverGatewayImpl rantzMarketingGateway= (MarketingReceiverGatewayImpl) context.getBean("consumer");System.out.println("Receive Start ...");try {while(true){rantzMarketingGateway.receiveMotorist();}} catch (Exception e) {e.printStackTrace();} }}
package activemq.test.p2p.producer;public interface IRantzMarketingGateway {/** * * 发送文本对象 * * @author zhangjh 新增日期:2013-9-20 * @since smsc-gateway */public void sendMotoristInfo();/** * * 发送对象 * * @author zhangjh 新增日期:2013-9-20 * @since smsc-gateway */public void sendObjectInfo();}
package activemq.test.p2p.producer;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.Session;import org.springframework.jms.core.JmsTemplate;import org.springframework.jms.core.MessageCreator;import activemq.test.model.User;public class RantzMarketingGatewayImpl implements IRantzMarketingGateway {private JmsTemplate jmsTemplate;private Destination destination;public JmsTemplate getJmsTemplate() {return jmsTemplate;}public void setJmsTemplate(JmsTemplate jmsTemplate) {this.jmsTemplate = jmsTemplate;}public Destination getDestination() {return destination;}public void setDestination(Destination destination) {this.destination = destination;}public void sendMotoristInfo() {MessageCreator msg = new MessageCreator(){public Message createMessage(Session session) throws JMSException {return session.createTextMessage("这是一个测试,"+System.currentTimeMillis());}};jmsTemplate.send(destination, msg);}public void sendObjectInfo() {User u = new User();u.setAge(17);u.setName("yuky"+System.currentTimeMillis());u.setSex("女");jmsTemplate.convertAndSend(u);}}
package activemq.test.p2p.producer;import org.springframework.context.ApplicationContext;import org.springframework.context.support.ClassPathXmlApplicationContext;public class StartProducer {public static void main(String[] args) {/*开始加载spring配置文件*/ApplicationContext context = new ClassPathXmlApplicationContext("classpath:modules/applicationContext.xml");IRantzMarketingGateway rantzMarketingGateway= (RantzMarketingGatewayImpl) context.getBean("producer");for(int i=0;i<10;i++){rantzMarketingGateway.sendObjectInfo();System.out.println("Start ...");}}}
package activemq.test.topic;import javax.jms.Connection;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageConsumer;import javax.jms.MessageListener;import javax.jms.MessageProducer;import javax.jms.ObjectMessage;import javax.jms.Session;import javax.jms.TextMessage;import javax.jms.Topic;import org.apache.activemq.ActiveMQConnectionFactory;import activemq.test.model.User;public class MyListener implements MessageListener {private ActiveMQConnectionFactory connectionFactory;private Connection connection;private Session session;private MessageProducer producer;private Topic topic; private Topic control;public Topic getTopic() {return topic;}public void setTopic(Topic topic) {this.topic = topic;}public Topic getControl() {return control;}public void setControl(Topic control) {this.control = control;}public ActiveMQConnectionFactory getConnectionFactory() {return connectionFactory;}public void setConnectionFactory(ActiveMQConnectionFactory connectionFactory) {this.connectionFactory = connectionFactory;}public void onMessage(Message message) {try{if (checkText(message, "SHUTDOWN")) { try { connection.close(); System.out.println("退出监听消息"); } catch (Exception e) { e.printStackTrace(System.out); } } else if (checkText(message, "REPORT")) { // send a report: try { System.out.println("MyListener->收到 a report"); long time = System.currentTimeMillis(); String msg = "MyListener->返回 a report :" + time + "ms"; System.out.println(msg); producer.send(session.createTextMessage(msg)); } catch (Exception e) { e.printStackTrace(System.out); } } else { ObjectMessage obj = (ObjectMessage)message; User u = (User) obj.getObject(); System.out.println("Received messages."+ u.toString()); }}catch(Exception e){}}public void run() throws JMSException {if(connectionFactory!=null){System.out.println("connectionFactory is ok");connection = connectionFactory.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer consumer = session.createConsumer(topic); consumer.setMessageListener(this); connection.start(); producer = session.createProducer(control); System.out.println("Waiting for messages...");}}private static boolean checkText(Message m, String s) { try { return m instanceof TextMessage && ((TextMessage)m).getText().equals(s); } catch (JMSException e) { e.printStackTrace(System.out); return false; } }}
package activemq.test.topic;import org.springframework.context.ApplicationContext;import org.springframework.context.support.ClassPathXmlApplicationContext;public class StartListener {public static void main(String[] args) {/*开始加载spring配置文件*/ApplicationContext context = new ClassPathXmlApplicationContext("classpath:modules/applicationContext.xml");MyListener myListener= (MyListener) context.getBean("myListener");try {if(myListener!=null){System.out.println("success...");}myListener.run();} catch (Exception e) {e.printStackTrace();} }}
package activemq.test.topic;import javax.jms.Connection;import javax.jms.DeliveryMode;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageListener;import javax.jms.MessageProducer;import javax.jms.ObjectMessage;import javax.jms.Session;import javax.jms.TextMessage;import javax.jms.Topic;import org.apache.activemq.ActiveMQConnectionFactory;import activemq.test.model.User;public class MyPublisher implements MessageListener {private ActiveMQConnectionFactory connectionFactory;private Connection connection;private Session session;private MessageProducer publisher;private Topic topic;private Topic control;private final Object mutex = new Object();public ActiveMQConnectionFactory getConnectionFactory() {return connectionFactory;}public void setConnectionFactory(ActiveMQConnectionFactory connectionFactory) {this.connectionFactory = connectionFactory;}public Topic getTopic() {return topic;}public void setTopic(Topic topic) {this.topic = topic;}public Topic getControl() {return control;}public void setControl(Topic control) {this.control = control;}public void onMessage(Message message) { synchronized (mutex) { System.out.println("Received report " + getReport(message) ); } }Object getReport(Message m) { try { return ((TextMessage)m).getText(); } catch (JMSException e) { e.printStackTrace(System.out); return e.toString(); } }public void publish() throws Exception { User u = new User();u.setAge(17);u.setName("yuky"+System.currentTimeMillis());u.setSex("女"); // send eventsObjectMessage obj = session.createObjectMessage();obj.setObject(u); for (int i = 0; i < 10; i++) { publisher.send(obj); publisher.send(session.createTextMessage("REPORT")); } } public void run() throws Exception {connection = connectionFactory.createConnection();session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);publisher = session.createProducer(topic);publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);session.createConsumer(control).setMessageListener(this); connection.start();}public void stop() throws JMSException{publisher.send(session.createTextMessage("SHUTDOWN"));connection.stop(); connection.close();}}
package activemq.test.topic;import javax.jms.JMSException;import org.springframework.context.ApplicationContext;import org.springframework.context.support.ClassPathXmlApplicationContext;public class StartPublisher {public static void main(String[] args) throws InterruptedException {/*开始加载spring配置文件*/ApplicationContext context = new ClassPathXmlApplicationContext("classpath:modules/applicationContext.xml");MyPublisher publisher= (MyPublisher) context.getBean("myPublisher");try {publisher.run();publisher.publish();} catch (Exception e) {try {publisher.stop();} catch (JMSException e1) {e1.printStackTrace();}e.printStackTrace();} }}