activeMQ 示例
activemq 点对点消费者生产者示例
activemqconnectionfactory connectionfactory = new activemqconnectionfactory("tcp://10.10.40.174:61616"); connection connection = null; session session = null; messageproducer producer = null; destination destination = null; try { connection = connectionfactory.createconnection("zengjun", "zj"); session = connection.createsession(false, session.client_acknowledge); destination = session.createqueue("test.queue.zj_02"); producer = session.createproducer(destination); //设置消息模式,有持久与非持久的 producer.setdeliverymode(deliverymode.persistent); connection.start(); double d = math.random(); inputstream in = new fileinputstream("d:\\2.09m.jar"); bufferedinputstream objbufferedinputstream = new bufferedinputstream(in); int len = objbufferedinputstream.available(); byte[] bbuffer = new byte[len]; //创建streammessage streammessage message = session.createstreammessage(); objbufferedinputstream.read(bbuffer); //添加byte数组数据 message.writebytes(bbuffer); //添加整型属性 message.setintproperty("messagelength1", len); //添加字符串属性 message.setstringproperty("id", string.valueof(d)); in.close(); producer.send(message); system.out.println("发送消息成功 id " + string.valueof(d)); } catch (jmsexception e) { e.printstacktrace(); } catch (exception ex) { ex.printstacktrace(); } finally { //关闭资源 connectionutil.closesession(session); connectionutil.closeconnection(connection); connectionutil.closemessageproducer(producer); } system.out.println("发送结束");
// 创建连接工厂 activemqconnectionfactory connectionfactory = new activemqconnectionfactory("tcp://10.10.40.174:61616"); session session = null; messageconsumer consumer = null; connection connection = null; message message = null; try { // 访问的用户与密码 connection = connectionfactory.createconnection("ll", "ll"); connection.start(); session = connection.createsession(false, session.client_acknowledge); destination destination = session.createqueue("test.queue.zj_02"); consumer = session.createconsumer(destination); // 无时间参数表示一直等待,直到收到消息。 // message = consumer.receive(); // 有时间参数表示指定时间后没有消息则结束时,如果存在消息就在取完消息后结束 message = consumer.receive(5 * 1000); // 立即往下执行 // message = consumer.receivenowait(); if (message != null) { system.out.println("收到消息"); if (message instanceof textmessage) { textmessage textmessage = (textmessage) message; string text = textmessage.gettext(); system.out.println("text:" + text); textmessage.acknowledge(); } else if (message instanceof streammessage) { streammessage streammessage = (streammessage) message; string strid = streammessage.getstringproperty("id"); system.out.println("streammessage id:" + strid); streammessage.acknowledge(); } } else { system.out.println("没有收到消息"); } } catch (exception e) { system.out.println("发生异常\n"); e.printstacktrace(); } finally { //关闭资源 connectionutil.closeall(connection, session, consumer); }
activemqconnectionfactory connectionfactory = new activemqconnectionfactory("tcp://10.10.40.174:61616"); topicconnection connection = null; activemqtopicsession session = null; activemqtopicpublisher publisher = null; activemqtopic topic = null; try { connection = connectionfactory.createtopicconnection("zengjun", "zj"); session = (activemqtopicsession) connection.createtopicsession(false, session.auto_acknowledge); topic = (activemqtopic) session.createtopic("test.topic.zj"); publisher = (activemqtopicpublisher) session.createpublisher(topic); publisher.setdeliverymode(deliverymode.persistent); connection.start(); int flag = 2; if (flag == 1) { textmessage messagetext = session.createtextmessage(); messagetext.settext("tipic:" + system.currenttimemillis()); publisher.publish(messagetext); } else { streammessage messagestream = session.createstreammessage(); fileinputstream fi = new fileinputstream("d:\\javaxyq.zip"); byte[] btyes = new byte[fi.available()]; fi.read(btyes); messagestream.writebytes(btyes); publisher.publish(messagestream); } system.out.println("topic消息发送成功"); } catch (exception e) { e.printstacktrace(); } finally { connectionutil.closesession(session); connectionutil.closeconnection(connection); connectionutil.closetopicpublisher(publisher); }
activemqconnectionfactory connectionfactory = new activemqconnectionfactory("tcp://10.10.40.174:61616"); topicconnection connection = null; activemqtopicsession session = null; activemqtopic topic = null; activemqtopicsubscriber subscriber = null; try { connection = connectionfactory.createtopicconnection("zengjun", "zj"); connection.setclientid("client_id_test"); connection.start(); session = (activemqtopicsession) connection.createtopicsession(false, session.auto_acknowledge); topic = (activemqtopic) session.createtopic("test.topic.zj"); //创建持久订阅 subscriber = (activemqtopicsubscriber) session.createdurablesubscriber(topic, "subscriber_name_test"); subscriber.setmessagelistener(this); } catch (exception e) { e.printstacktrace(); } finally { // connectionutil.closesession(session); // connectionutil.closeconnection(connection); // connectionutil.closetopicsubscriber(subscriber); } } public void onmessage(message message) { system.out.println("收到消息"); try { if (message != null) { if (message instanceof textmessage) { textmessage textmessage = (textmessage) message; system.out.println(textmessage.gettext()); }else if(message instanceof streammessage){ system.out.println("收到steam消息"); } } else { system.out.println("没有收到消息"); } thread.sleep(1 * 1000); } catch (exception e) { e.printstacktrace(); } }