JMS 使用 ActiveMQ 传送文件
这里使用的 MQ 中间件是开源的 ActiveMQ,我们没有采用 BytesMessage 来按字节传送文件,而是 ActiveMQ 为我们提供了 org.apache.activemq.BlobMessage,可以用它来传送大对象。org.apache.activemq.ActiveMQSession 中有以下几个创建 BlobMessage 对象的方法:
createBlobMessage(URL url)
createBlobMessage(URL url, boolean deletedByBroker)
createBlobMessage(File file)
createBlobMessage(InputStream in)
接收到 BlobMessage 消息后,可以调用其 getInputStream() 方法获得数据,然后写成磁盘文件,文件名、文件大小等可通过 Message 的 getXxxProperty("Property.Name") 取的。
注 意,传输入文件的时候,发送方创建 ConnectionFactory 时的 brokerURL 需要指定 jms.blobTransferPolicy.uploadUrl 或者jms.blobTransferPolicy.defaultUploadUrl 属性为 ActiveMQ 中 fileserver 应用的 URI,即指定传输 BlogMessage 的 BlobTransferPolicy 策略,参看 Configuring the BLOB Transfer Policy。
1. 启动 ActiveMQ
2. 编写发送文件的程序 FileSender.java
package org.laurel.jms;import java.io.File;import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.DeliveryMode;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.MessageProducer;import javax.jms.Session;import javax.swing.JFileChooser;import org.apache.activemq.ActiveMQConnectionFactory;import org.apache.activemq.ActiveMQSession;import org.apache.activemq.BlobMessage;/** * 通过 ActiveMQ 发送文件的程序 * * @author wb-liufei * */public class FileSender {/** * @param args * @throws JMSException */public static void main(String[] args) throws JMSException {// 选择文件JFileChooser fileChooser = new JFileChooser();fileChooser.setDialogTitle("请选择要传送的文件");if (fileChooser.showOpenDialog(null) != JFileChooser.APPROVE_OPTION) {return;}File file = fileChooser.getSelectedFile();// 获取 ConnectionFactoryConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616?jms.blobTransferPolicy.defaultUploadUrl=http://localhost:8161/fileserver/");// 创建 ConnectionConnection connection = connectionFactory.createConnection();connection.start();// 创建 SessionActiveMQSession session = (ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 创建 DestinationDestination destination = session.createQueue("File.Transport");// 创建 ProducerMessageProducer producer = session.createProducer(destination);producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);// 设置为非持久性// 设置持久性的话,文件也可以先缓存下来,接收端离线再连接也可以收到文件// 构造 BlobMessage,用来传输文件//如果设置 producer.setDeliveryMode(DeliveryMode.PERSISTENT); 消息持久性的话,//发送方传文件的时候,接收方可以不在线,文件会暂存在 ActiveMQ 服务器上,等到接收程序上线后仍然可以收到发过来的文件。BlobMessage blobMessage = session.createBlobMessage(file);blobMessage.setStringProperty("FILE.NAME", file.getName());blobMessage.setLongProperty("FILE.SIZE", file.length());System.out.println("开始发送文件:" + file.getName() + ",文件大小:" + file.length() + " 字节");// 7. 发送文件producer.send(blobMessage);System.out.println("完成文件发送:" + file.getName());producer.close();session.close();connection.close(); // 不关闭 Connection, 程序则不退出}}
package org.laurel.jms;import java.io.File;import java.io.FileOutputStream;import java.io.InputStream;import java.io.OutputStream;import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageConsumer;import javax.jms.MessageListener;import javax.jms.Session;import javax.swing.JFileChooser;import org.apache.activemq.ActiveMQConnectionFactory;import org.apache.activemq.BlobMessage;public class FileReciever {/** * @param args * @throws JMSException */public static void main(String[] args) throws JMSException {// 获取 ConnectionFactoryConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");// 创建 ConnectionConnection connection = connectionFactory.createConnection();connection.start();// 创建 SessionSession session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 创建 DestinationeDestination destination = session.createQueue("File.Transport");// 创建 ConsumerMessageConsumer consumer = session.createConsumer(destination);// 注册消息监听器,当消息到达时被触发并处理消息consumer.setMessageListener(new MessageListener() {// 监听器中处理消息public void onMessage(Message message) {if (message instanceof BlobMessage) {BlobMessage blobMessage = (BlobMessage) message;try {String fileName = blobMessage.getStringProperty("FILE.NAME");System.out.println("文件接收请求处理:" + fileName + ",文件大小:" + blobMessage.getLongProperty("FILE.SIZE")+ " 字节");JFileChooser fileChooser = new JFileChooser();fileChooser.setDialogTitle("请指定文件保存位置");fileChooser.setSelectedFile(new File(fileName));if (fileChooser.showSaveDialog(null) == JFileChooser.APPROVE_OPTION) {File file = fileChooser.getSelectedFile();OutputStream os = new FileOutputStream(file);System.out.println("开始接收文件:" + fileName);InputStream inputStream = blobMessage.getInputStream();// 写文件,你也可以使用其他方式byte[] buff = new byte[256];int len = 0;while ((len = inputStream.read(buff)) > 0) {os.write(buff, 0, len);}os.close();System.out.println("完成文件接收:" + fileName);}} catch (Exception e) {e.printStackTrace();}}}});}}