首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 开发语言 > 编程 >

activeMQ 示范

2012-11-09 
activeMQ 示例activemq 点对点消费者生产者示例activemqconnectionfactory connectionfactory new activ

activeMQ 示例
    activemq 点对点消费者生产者示例

activemqconnectionfactory connectionfactory = new activemqconnectionfactory("tcp://10.10.40.174:61616");        connection connection = null;        session session = null;        messageproducer producer = null;        destination destination = null;        try {            connection = connectionfactory.createconnection("zengjun", "zj");            session = connection.createsession(false, session.client_acknowledge);            destination = session.createqueue("test.queue.zj_02");            producer = session.createproducer(destination);            //设置消息模式,有持久与非持久的            producer.setdeliverymode(deliverymode.persistent);            connection.start();            double d = math.random();            inputstream in = new fileinputstream("d:\\2.09m.jar");            bufferedinputstream objbufferedinputstream = new bufferedinputstream(in);            int len = objbufferedinputstream.available();            byte[] bbuffer = new byte[len];            //创建streammessage            streammessage message = session.createstreammessage();            objbufferedinputstream.read(bbuffer);            //添加byte数组数据            message.writebytes(bbuffer);            //添加整型属性            message.setintproperty("messagelength1", len);            //添加字符串属性            message.setstringproperty("id", string.valueof(d));            in.close();            producer.send(message);            system.out.println("发送消息成功 id " + string.valueof(d));        } catch (jmsexception e) {            e.printstacktrace();        } catch (exception ex) {            ex.printstacktrace();        } finally {            //关闭资源            connectionutil.closesession(session);            connectionutil.closeconnection(connection);            connectionutil.closemessageproducer(producer);        }        system.out.println("发送结束");


activemq 点对点消费者代码示例
// 创建连接工厂        activemqconnectionfactory connectionfactory = new activemqconnectionfactory("tcp://10.10.40.174:61616");        session session = null;        messageconsumer consumer = null;        connection connection = null;        message message = null;        try {            // 访问的用户与密码            connection = connectionfactory.createconnection("ll", "ll");            connection.start();            session = connection.createsession(false, session.client_acknowledge);            destination destination = session.createqueue("test.queue.zj_02");            consumer = session.createconsumer(destination);            // 无时间参数表示一直等待,直到收到消息。            // message = consumer.receive();            // 有时间参数表示指定时间后没有消息则结束时,如果存在消息就在取完消息后结束            message = consumer.receive(5 * 1000);            // 立即往下执行            // message = consumer.receivenowait();            if (message != null) {                system.out.println("收到消息");                if (message instanceof textmessage) {                    textmessage textmessage = (textmessage) message;                    string text = textmessage.gettext();                    system.out.println("text:" + text);                    textmessage.acknowledge();                } else if (message instanceof streammessage) {                    streammessage streammessage = (streammessage) message;                    string strid = streammessage.getstringproperty("id");                    system.out.println("streammessage  id:" + strid);                    streammessage.acknowledge();                }            } else {                system.out.println("没有收到消息");            }        } catch (exception e) {            system.out.println("发生异常\n");            e.printstacktrace();        } finally {            //关闭资源            connectionutil.closeall(connection, session, consumer);        }

activemq 发布/订阅,发布者代码示例
activemqconnectionfactory connectionfactory = new activemqconnectionfactory("tcp://10.10.40.174:61616");        topicconnection connection = null;        activemqtopicsession session = null;        activemqtopicpublisher publisher = null;        activemqtopic topic = null;        try {            connection = connectionfactory.createtopicconnection("zengjun", "zj");            session = (activemqtopicsession) connection.createtopicsession(false, session.auto_acknowledge);            topic = (activemqtopic) session.createtopic("test.topic.zj");            publisher = (activemqtopicpublisher) session.createpublisher(topic);            publisher.setdeliverymode(deliverymode.persistent);            connection.start();            int flag = 2;            if (flag == 1) {                textmessage messagetext = session.createtextmessage();                messagetext.settext("tipic:" + system.currenttimemillis());                publisher.publish(messagetext);            } else {                streammessage messagestream = session.createstreammessage();                fileinputstream fi = new fileinputstream("d:\\javaxyq.zip");                byte[] btyes = new byte[fi.available()];                fi.read(btyes);                messagestream.writebytes(btyes);                publisher.publish(messagestream);            }            system.out.println("topic消息发送成功");        } catch (exception e) {            e.printstacktrace();        } finally {            connectionutil.closesession(session);            connectionutil.closeconnection(connection);            connectionutil.closetopicpublisher(publisher);        }

activemq 发布/订阅,持久订阅代码示例
activemqconnectionfactory connectionfactory = new activemqconnectionfactory("tcp://10.10.40.174:61616");        topicconnection connection = null;        activemqtopicsession session = null;        activemqtopic topic = null;        activemqtopicsubscriber subscriber = null;        try {            connection = connectionfactory.createtopicconnection("zengjun", "zj");            connection.setclientid("client_id_test");            connection.start();            session = (activemqtopicsession) connection.createtopicsession(false, session.auto_acknowledge);            topic = (activemqtopic) session.createtopic("test.topic.zj");           //创建持久订阅            subscriber = (activemqtopicsubscriber) session.createdurablesubscriber(topic, "subscriber_name_test");            subscriber.setmessagelistener(this);        } catch (exception e) {            e.printstacktrace();        } finally {            // connectionutil.closesession(session);            // connectionutil.closeconnection(connection);            // connectionutil.closetopicsubscriber(subscriber);        }    }    public void onmessage(message message) {        system.out.println("收到消息");        try {            if (message != null) {                if (message instanceof textmessage) {                    textmessage textmessage = (textmessage) message;                    system.out.println(textmessage.gettext());                }else if(message instanceof streammessage){                    system.out.println("收到steam消息");                }            } else {                system.out.println("没有收到消息");            }            thread.sleep(1 * 1000);        } catch (exception e) {            e.printstacktrace();        }    }



附件是我在网上搜集的一些资料,以及自己的一些体会文档 

热点排行