首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 软件管理 > 软件架构设计 >

Spring-ActiveMQ的点对点跟Topic

2013-10-12 
Spring-ActiveMQ的点对点和Topicproject xmlnshttp://maven.apache.org/POM/4.0.0 xmlns:xsihttp://

Spring-ActiveMQ的点对点和Topic

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">  <modelVersion>4.0.0</modelVersion>  <groupId>com.example.activemq</groupId>  <artifactId>activemq-test</artifactId>  <version>1.0-SNAPSHOT</version>  <packaging>jar</packaging>  <name>activemq-test</name>  <url>http://maven.apache.org</url>  <properties>    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>    <org.springframework.version>3.1.1.RELEASE</org.springframework.version>  </properties>  <repositories>  <repository><id>kxcomm-maven</id><name>Maven kxcomm Repository</name><url>http://122.13.0.56:8088/nexus/content/groups/public/</url><releases><enabled>true</enabled></releases><snapshots><enabled>true</enabled></snapshots></repository>   </repositories>  <dependencies>    <dependency>      <groupId>junit</groupId>      <artifactId>junit</artifactId>      <version>3.8.1</version>      <scope>test</scope>    </dependency>    <dependency><groupId>commons-collections</groupId><artifactId>commons-collections</artifactId><version>3.2</version></dependency><dependency><groupId>commons-configuration</groupId><artifactId>commons-configuration</artifactId><version>1.6</version></dependency><dependency><groupId>commons-io</groupId><artifactId>commons-io</artifactId><version>1.3.2</version></dependency><dependency><groupId>commons-logging</groupId><artifactId>commons-logging</artifactId><version>1.1.1</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version></dependency><dependency><groupId>commons-beanutils</groupId><artifactId>commons-beanutils</artifactId><version>1.8.3</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-asm</artifactId><version>${org.springframework.version}</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-jms</artifactId><version>${org.springframework.version}</version></dependency><dependency><groupId>com.davidkarlsen.commonstransaction.spring</groupId><artifactId>commons-transaction-spring</artifactId><version>0.9</version></dependency><dependency><groupId>org.apache.httpcomponents</groupId><artifactId>httpclient</artifactId><version>4.2-beta1</version></dependency><dependency><groupId>fastutil</groupId><artifactId>fastutil</artifactId><version>5.0.9</version></dependency><dependency><groupId>ch.qos.logback</groupId><artifactId>logback-classic</artifactId><version>0.9.27</version><scope>compile</scope></dependency><dependency><groupId>ch.qos.logback</groupId><artifactId>logback-core</artifactId><version>0.9.27</version><scope>compile</scope></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.6.1</version><scope>compile</scope></dependency><dependency><groupId>org.slf4j</groupId><artifactId>jcl-over-slf4j</artifactId><version>1.6.1</version><scope>runtime</scope></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.5</version></dependency><dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-all</artifactId><version>5.8.0</version></dependency>  </dependencies></project>


<?xml version="1.0" encoding="UTF-8"?><beans    xmlns="http://www.springframework.org/schema/beans"    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"       xmlns:aop="http://www.springframework.org/schema/aop"     xmlns:tx="http://www.springframework.org/schema/tx"     xmlns:context="http://www.springframework.org/schema/context"     xsi:schemaLocation="                 http://www.springframework.org/schema/beans                http://www.springframework.org/schema/beans/spring-beans-2.5.xsd                http://www.springframework.org/schema/aop                http://www.springframework.org/schema/aop/spring-aop-2.5.xsd                http://www.springframework.org/schema/tx                http://www.springframework.org/schema/tx/spring-tx-2.5.xsd                http://www.springframework.org/schema/context                http://www.springframework.org/schema/context/spring-context-2.5.xsd"                 default-autowire="byName" default-lazy-init="true"><import resource="activemq-test.xml"/></beans>


<?xml version="1.0" encoding="UTF-8"?><beans  xmlns="http://www.springframework.org/schema/beans"  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd  http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd"><!-- 创建工厂连接 --><bean id="connectionFactory" value="tcp://localhost:61616" /></bean><bean id="jmsTemplate" ref="connectionFactory" /> <property name="defaultDestination" ref="rantzDestination" />  </bean><!-- Point-to-Point --><!-- activeMQ消息目标 队列 --><bean id="rantzDestination" value="rantz.marketing.queue"></constructor-arg></bean><!-- activeMQ消息目标 主题-->        <!--<bean id="rantzDestination" value="rantz.marketing.queue"></constructor-arg>-->        <!--</bean>-->        <bean id="producer" ref="jmsTemplate" /><property name="destination" ref="rantzDestination" /></bean><bean id="consumer" ref="jmsTemplate" />  </bean>  <!-- Point-to-Point End--><!-- Topic --><bean id="topic" autowire="constructor">        <constructor-arg index="0" value="kxcomm.mms.topic" />    </bean>    <bean id="control" autowire="constructor">        <constructor-arg index="0" value="kxcomm.mms.control" />    </bean><bean id="myListener" ref="connectionFactory" />    <property name="topic" ref="topic" />    <property name="control" ref="control" />    </bean>      <bean id="myPublisher" ref="connectionFactory" />    <property name="topic" ref="topic" />    <property name="control" ref="control" />    </bean><!-- Topic End--></beans>


package activemq.test.model;import java.io.Serializable;public class User implements Serializable{private static final long serialVersionUID = -3098636047897519268L;private String name;private String sex;private int age;public String getName() {return name;}public void setName(String name) {this.name = name;}public String getSex() {return sex;}public void setSex(String sex) {this.sex = sex;}public int getAge() {return age;}public void setAge(int age) {this.age = age;}@Overridepublic String toString() {return "User [name=" + name + ", sex=" + sex + ", age=" + age + "]";}}


PTP模型
PTP(Point-to-Point)模型是基于队列的,生产者发消息到队列,消费者从队列接收消息,队列的存在使得消息的异步传输成为可能。和邮件系统中的邮箱一样,队列可以包含各种消息,JMS Provider 提供工具管理队列的创建、删除。JMS PTP 模型定义了客户端如何向队列发送消息,从队列接收消息,浏览队列中的消息。
package activemq.test.p2p.consumer;import org.springframework.jms.core.JmsTemplate;import activemq.test.model.User;public class MarketingReceiverGatewayImpl {private JmsTemplate jmsTemplate;public JmsTemplate getJmsTemplate() {return jmsTemplate;}public void setJmsTemplate(JmsTemplate jmsTemplate) {this.jmsTemplate = jmsTemplate;}public MarketingReceiverGatewayImpl() {}public void receiveMotorist() throws Exception{User message  = (User)jmsTemplate.receiveAndConvert();System.out.println("reviced msg is:" + message.toString());}}

package activemq.test.p2p.consumer;import org.springframework.context.ApplicationContext;import org.springframework.context.support.ClassPathXmlApplicationContext;public class StartConsumer {public static void main(String[] args) {/*开始加载spring配置文件*/ApplicationContext context = new ClassPathXmlApplicationContext("classpath:modules/applicationContext.xml");MarketingReceiverGatewayImpl rantzMarketingGateway= (MarketingReceiverGatewayImpl) context.getBean("consumer");System.out.println("Receive Start ...");try {while(true){rantzMarketingGateway.receiveMotorist();}} catch (Exception e) {e.printStackTrace();}    }}

package activemq.test.p2p.producer;public interface IRantzMarketingGateway {/** *  * 发送文本对象 *  * @author zhangjh 新增日期:2013-9-20 * @since smsc-gateway */public void sendMotoristInfo();/** *  * 发送对象 *  * @author zhangjh 新增日期:2013-9-20 * @since smsc-gateway */public void sendObjectInfo();}

package activemq.test.p2p.producer;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.Session;import org.springframework.jms.core.JmsTemplate;import org.springframework.jms.core.MessageCreator;import activemq.test.model.User;public class RantzMarketingGatewayImpl implements IRantzMarketingGateway {private JmsTemplate jmsTemplate;private Destination destination;public JmsTemplate getJmsTemplate() {return jmsTemplate;}public void setJmsTemplate(JmsTemplate jmsTemplate) {this.jmsTemplate = jmsTemplate;}public Destination getDestination() {return destination;}public void setDestination(Destination destination) {this.destination = destination;}public void sendMotoristInfo() {MessageCreator msg = new MessageCreator(){public Message createMessage(Session session) throws JMSException {return session.createTextMessage("这是一个测试,"+System.currentTimeMillis());}};jmsTemplate.send(destination, msg);}public void sendObjectInfo() {User u = new User();u.setAge(17);u.setName("yuky"+System.currentTimeMillis());u.setSex("女");jmsTemplate.convertAndSend(u);}}

package activemq.test.p2p.producer;import org.springframework.context.ApplicationContext;import org.springframework.context.support.ClassPathXmlApplicationContext;public class StartProducer {public static void main(String[] args) {/*开始加载spring配置文件*/ApplicationContext context = new ClassPathXmlApplicationContext("classpath:modules/applicationContext.xml");IRantzMarketingGateway rantzMarketingGateway= (RantzMarketingGatewayImpl) context.getBean("producer");for(int i=0;i<10;i++){rantzMarketingGateway.sendObjectInfo();System.out.println("Start ...");}}}


PUB/SUB模型
消息订阅分为非持久订阅(non-durable subscription)和持久订阅(durable subscrip-tion),非持久订阅只有当客户端处于激活状态,也就是和JMS Provider 保持连接状态才能收到发送到某个主题的消息,而当客户端处于离线状态,这个时间段发到主题的消息将会丢失,永远不会收到。持久订阅时,客户端向JMS 注册一个识别自己身份的ID,当这个客户端处于离线时,JMS Provider 会为这个ID 保存所有发送到主题的消息,当客户再次连接到JMS Provider时,会根据自己的ID 得到所有当自己处于离线时发送到主题的消息。

package activemq.test.topic;import javax.jms.Connection;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageConsumer;import javax.jms.MessageListener;import javax.jms.MessageProducer;import javax.jms.ObjectMessage;import javax.jms.Session;import javax.jms.TextMessage;import javax.jms.Topic;import org.apache.activemq.ActiveMQConnectionFactory;import activemq.test.model.User;public class MyListener implements MessageListener {private ActiveMQConnectionFactory connectionFactory;private Connection connection;private Session session;private MessageProducer producer;private Topic topic;    private Topic control;public Topic getTopic() {return topic;}public void setTopic(Topic topic) {this.topic = topic;}public Topic getControl() {return control;}public void setControl(Topic control) {this.control = control;}public ActiveMQConnectionFactory getConnectionFactory() {return connectionFactory;}public void setConnectionFactory(ActiveMQConnectionFactory connectionFactory) {this.connectionFactory = connectionFactory;}public void onMessage(Message message) {try{if (checkText(message, "SHUTDOWN")) {            try {                connection.close();                System.out.println("退出监听消息");            } catch (Exception e) {                e.printStackTrace(System.out);            }        } else if (checkText(message, "REPORT")) {            // send a report:            try {             System.out.println("MyListener->收到 a report");                long time = System.currentTimeMillis();                String msg = "MyListener->返回 a report :" + time + "ms";                System.out.println(msg);                producer.send(session.createTextMessage(msg));            } catch (Exception e) {                e.printStackTrace(System.out);            }        } else {        ObjectMessage obj = (ObjectMessage)message;        User u = (User) obj.getObject();        System.out.println("Received  messages."+ u.toString());        }}catch(Exception e){}}public void run() throws JMSException {if(connectionFactory!=null){System.out.println("connectionFactory is ok");connection = connectionFactory.createConnection();        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);        MessageConsumer consumer = session.createConsumer(topic);        consumer.setMessageListener(this);        connection.start();                producer = session.createProducer(control);        System.out.println("Waiting for messages...");}}private static boolean checkText(Message m, String s) {        try {            return m instanceof TextMessage && ((TextMessage)m).getText().equals(s);        } catch (JMSException e) {            e.printStackTrace(System.out);            return false;        }    }}

package activemq.test.topic;import org.springframework.context.ApplicationContext;import org.springframework.context.support.ClassPathXmlApplicationContext;public class StartListener {public static void main(String[] args) {/*开始加载spring配置文件*/ApplicationContext context = new ClassPathXmlApplicationContext("classpath:modules/applicationContext.xml");MyListener myListener= (MyListener) context.getBean("myListener");try {if(myListener!=null){System.out.println("success...");}myListener.run();} catch (Exception e) {e.printStackTrace();}    }}

package activemq.test.topic;import javax.jms.Connection;import javax.jms.DeliveryMode;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageListener;import javax.jms.MessageProducer;import javax.jms.ObjectMessage;import javax.jms.Session;import javax.jms.TextMessage;import javax.jms.Topic;import org.apache.activemq.ActiveMQConnectionFactory;import activemq.test.model.User;public class MyPublisher implements MessageListener {private ActiveMQConnectionFactory connectionFactory;private Connection connection;private Session session;private MessageProducer publisher;private Topic topic;private Topic control;private final Object mutex = new Object();public ActiveMQConnectionFactory getConnectionFactory() {return connectionFactory;}public void setConnectionFactory(ActiveMQConnectionFactory connectionFactory) {this.connectionFactory = connectionFactory;}public Topic getTopic() {return topic;}public void setTopic(Topic topic) {this.topic = topic;}public Topic getControl() {return control;}public void setControl(Topic control) {this.control = control;}public void onMessage(Message message) {        synchronized (mutex) {            System.out.println("Received report " + getReport(message) );                    }    }Object getReport(Message m) {        try {            return ((TextMessage)m).getText();        } catch (JMSException e) {            e.printStackTrace(System.out);            return e.toString();        }    }public void publish() throws Exception { User u = new User();u.setAge(17);u.setName("yuky"+System.currentTimeMillis());u.setSex("女");        // send eventsObjectMessage obj = session.createObjectMessage();obj.setObject(u);        for (int i = 0; i < 10; i++) {            publisher.send(obj);            publisher.send(session.createTextMessage("REPORT"));        }    } public void run() throws Exception {connection = connectionFactory.createConnection();session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);publisher = session.createProducer(topic);publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);session.createConsumer(control).setMessageListener(this);        connection.start();}public void stop() throws JMSException{publisher.send(session.createTextMessage("SHUTDOWN"));connection.stop();        connection.close();}}

package activemq.test.topic;import javax.jms.JMSException;import org.springframework.context.ApplicationContext;import org.springframework.context.support.ClassPathXmlApplicationContext;public class StartPublisher {public static void main(String[] args) throws InterruptedException {/*开始加载spring配置文件*/ApplicationContext context = new ClassPathXmlApplicationContext("classpath:modules/applicationContext.xml");MyPublisher publisher= (MyPublisher) context.getBean("myPublisher");try {publisher.run();publisher.publish();} catch (Exception e) {try {publisher.stop();} catch (JMSException e1) {e1.printStackTrace();}e.printStackTrace();}    }}

热点排行