(六) 阻塞队列
阻塞队列(BlockingQueue)
对于许多线程问题,可以通过使用一个或多个队列以优雅且安全的方式将其形式化。
例如:生产者线程向队列插入元素,消费者线程则取出它们。
在协调多个线程之间的合作时,阻塞队列是一个有用的工具。工作者线程可以周期性将中间结果存储在阻塞队列中。其他工作者的线程移出中间结果并进一步加以修改。队列会自动平衡负载。
阻塞队列(java.util.concurrent.BlockingQueue)的方法
方法??? ??? ??? 正常动作??? ??? ??? ??? ??? ??? 特殊情况下的动作
add??? ??? ??? 添加一个元素??? ??? ??? ??? 如果队列满,则抛出IllegalStateException异常
element??? ??? 返回队列的头元素??? ??? ??? 如果队列空,则抛出NoSuchElementException异常
remove??? ??? 移出并返回头元素??? ??? ??? 如果队列空,则抛出NoSuchElementException异常
offer??? ??? 添加一个元素并返回true??? ??? 如果队列满,返回false
peek??? ??? 返回队列的头元素??? ??? ??? 如果队列空,则返回null
poll??? ??? 移出并返回队列的头元素??? ??? 如果队列空,则返回null
put??? ??? ??? 添加一个元素??? ??? ??? ??? 如果队列满,则阻塞
take??? ??? 移出并返回头元素??? ??? ??? 如果队列空,则阻塞
阻塞队列方法分为以下3类,取决于当队列满或空时它们的响应方式
1.如果将阻塞队列当做线程管理工具来使用,将要用到put和take方法。如果队列满用put方法阻塞,如果队列空用take方法阻塞。
2.当试图想满的阻塞队列中添加或从空的阻塞队列中返回或移出元素时,add、element、remove操作将抛出异常
3.在一个多线程程序中,阻塞队列会在任何时刻空或满,因此一定要用offer、peek、poll方法作为替代。这些方法如果不能完成任务,只是给一个错误提示而不会抛出异常。
注意:peek和poll方法返回空来指示失败。因此,向这些队列中插入null值是非法的。
带有超时的offer方法和poll方法(上述方法中只有offer和poll方法存在超时参数):
offer(E e, long timeout, TimeUnit unit) : 将指定的元素插入此队列的尾部,如果该队列已满,则在到达指定的等待时间之前等待可用的空间。
poll(long timeout, TimeUnit unit) : 获取并移除此队列的头部,在指定的等待时间前等待可用的元素(如果有必要)。
e.g.boolean success = q.offer(x, 100, TimeUnit.MILLISECONDS)
??? Object head = q.poll(100, TimeUnit.MILLISECONDS)
java.util.concurrent包提供的阻塞队列实现类:
(1)LinkedBlockingQueue : 默认情况下,LinkedBlockingQueue的容量没有上边界,但是也可以选择指定大容量。
(2)LinkedBlockingDeque : 是一个双端的版本。
(3)ArrayBlockingQueue : 在构件时需要指定容量,并且有一个可选的参数来指定是否需要公平性。若设置了公平参数,则那么等待了最长时间的的线程会优先得到处理。通常公平性会降低性能,只有在确实非常需要时才使用它。
(4)PriorityBlockingQueue : 是一个带优先级的队列,而不是先进先出队列。元素按照它们的优先级顺序被移出。该队列是没有容量上限的,但是资源被耗尽时试图执行 add 操作也将失败(导致 OutOfMemoryError);如果队列是空的,取元素的操作会阻塞。
(5)DelayQueue : 包含实现Delayed接口的对象:getDelay方法返回对象的残留延迟,负值表示延迟已经结束。元素只有在延迟用完的情况下才能从DelayQueue移除。还必须实现compareTo方法。DelayQueue使用该方法对元素进行排序。
public interface Delayed extends Comparable<Delayed> {
??? long getDelay(TimeUnit unit);
}
?
?
DEMO:FileEnumerationTask的中File DUMMY = new File("")作用是作为最后一个元素take放入进去,当用于读取的线程读取到DUMMY后,表示全部任务完成,终止读取,防止最后一次put一直阻塞线程,导致线程无法终止。
?
import java.io.File;import java.util.Scanner;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.BlockingQueue;public class BlockingQueueTest {private final static int FILE_QUEUE_SIZE = 10;private final static int SEARCH_THREADS = 1;public static void main(String[] args) {Scanner in = new Scanner(System.in);System.out.println("Enter base directory (e.g. D:\\EDPWorkspace):");String directory = in.nextLine();System.out.println("Enter keyword (e.g. python):");String keyword = in.nextLine();System.out.println(directory + "," + keyword);BlockingQueue<File> queue = new ArrayBlockingQueue<File>(FILE_QUEUE_SIZE);FileEnumerationTask enumerator = new FileEnumerationTask(queue, new File(directory));new Thread(enumerator).start();new Thread(new SearchTask(queue, keyword)).start();}}
?
import java.io.File;import java.util.concurrent.BlockingQueue;public class FileEnumerationTask implements Runnable{private BlockingQueue<File> queue;private File startDirectory;public static File DUMMY = new File("");public FileEnumerationTask(BlockingQueue<File> queue, File startDirectory){this.queue = queue;this.startDirectory = startDirectory;}@Overridepublic void run(){try{enumerate(startDirectory);queue.put(DUMMY);}catch(Exception e){e.printStackTrace();}}public void enumerate(File directory) throws InterruptedException{File[] files = directory.listFiles();for(File file:files){if(file.isDirectory()){enumerate(file);}else{queue.put(file);}}}}
?
import java.io.File;import java.io.FileInputStream;import java.io.IOException;import java.util.Scanner;import java.util.concurrent.BlockingQueue;public class SearchTask implements Runnable {private BlockingQueue<File> queue;private String keyword;public SearchTask(BlockingQueue<File> queue, String keyword){this.queue = queue;this.keyword = keyword;}@Overridepublic void run() {try{boolean done = false;while(!done){File file = queue.take();if(file == FileEnumerationTask.DUMMY){queue.put(file);done = true;}else{search(file);}}}catch(IOException e){e.printStackTrace();}catch(InterruptedException ex){ex.printStackTrace();}}public void search(File file) throws IOException{Scanner in = new Scanner(new FileInputStream(file));int lineNumber = 0;while(in.hasNextLine()){lineNumber++;String line = in.nextLine();if(line.contains(keyword)){System.out.printf("%s:%d:%s%n", file.getPath(), lineNumber, line);}}in.close();}}