闭锁/栅栏/信号量/FutureTask分析及使用
闭锁/栅栏/信号量/FutureTask分析及使用
?
1、闭锁
?
public class CountDownLatchTest {public void timeTasks(int nThreads, final Runnable task) throws InterruptedException{final CountDownLatch startGate = new CountDownLatch(1);final CountDownLatch endGate = new CountDownLatch(nThreads);for(int i = 0; i < nThreads; i++){Thread t = new Thread(){public void run(){try{startGate.await();try{task.run();}finally{endGate.countDown();}}catch(InterruptedException ignored){}}};t.start();}long start = System.nanoTime();System.out.println("打开闭锁");startGate.countDown();endGate.await();long end = System.nanoTime();System.out.println("闭锁退出,共耗时" + (end-start));}public static void main(String[] args) throws InterruptedException{CountDownLatchTest test = new CountDownLatchTest();test.timeTasks(5, test.new RunnableTask());}class RunnableTask implements Runnable{@Overridepublic void run() {System.out.println("当前线程为:" + Thread.currentThread().getName());}}
?
执行结果为:打开闭锁当前线程为:Thread-0当前线程为:Thread-3当前线程为:Thread-2当前线程为:Thread-4当前线程为:Thread-1闭锁退出,共耗时1109195
?2、栅栏
?
public class CyclicBarrierTest {private final CyclicBarrier barrier;private final Worker[] workers;public CyclicBarrierTest(){int count = Runtime.getRuntime().availableProcessors();this.barrier = new CyclicBarrier(count,new Runnable(){@Overridepublic void run() {System.out.println("所有线程均到达栅栏位置,开始下一轮计算");}});this.workers = new Worker[count];for(int i = 0; i< count;i++){workers[i] = new Worker(i);}}private class Worker implements Runnable{int i;public Worker(int i){this.i = i;}@Overridepublic void run() {for(int index = 1; index < 3;index++){System.out.println("线程" + i + "第" + index + "次到达栅栏位置,等待其他线程到达");try {//注意是await,而不是waitbarrier.await();} catch (InterruptedException e) {e.printStackTrace();return;} catch (BrokenBarrierException e) {e.printStackTrace();return;}}}}public void start(){for(int i=0;i<workers.length;i++){new Thread(workers[i]).start();}}public static void main(String[] args){new CyclicBarrierTest().start();}}
?
执行结果为:线程0第1次到达栅栏位置,等待其他线程到达线程1第1次到达栅栏位置,等待其他线程到达线程2第1次到达栅栏位置,等待其他线程到达线程3第1次到达栅栏位置,等待其他线程到达所有线程均到达栅栏位置,开始下一轮计算线程3第2次到达栅栏位置,等待其他线程到达线程2第2次到达栅栏位置,等待其他线程到达线程0第2次到达栅栏位置,等待其他线程到达线程1第2次到达栅栏位置,等待其他线程到达所有线程均到达栅栏位置,开始下一轮计算
?3、信号量
?
public class SemaphoreTest<T> {private final Set<T> set;private final Semaphore sema;public SemaphoreTest(int bound){this.set = Collections.synchronizedSet(new HashSet<T>());this.sema = new Semaphore(bound);}public boolean add(T o) throws InterruptedException{sema.acquire();boolean wasAdded = false;try{wasAdded = set.add(o);return wasAdded;}finally{if(!wasAdded){sema.release();}}}public boolean remove(T o){boolean wasRemoved = set.remove(o);if(wasRemoved){sema.release();}return wasRemoved;}public static void main(String[] args) throws InterruptedException{int permits = 5;int elements = permits + 1;SemaphoreTest<Integer> test = new SemaphoreTest<Integer>(permits);for(int i = 0;i < elements; i++){test.add(i);}}}
?输出结果:由于实际待添加的元素个数大于信号量所允许的数量,因此最后一次添加时,会一直阻塞。
?
当一个计算的代价比较高,譬如比较耗时,或者耗资源,为了避免重复计算带来的浪费,当第一次计算后,通常会将结果缓存起来。比较常见的方式就是使用synchronized进行同步,但该方式带来的代价是被同步的代码只能被串行执行,如果有多个线程在排队对待计算结果,那么针对最后一个线程的计算时间可能比没有使用缓存的时间会更长。
第二种方式是采用ConcurrentHashMap,但对于耗时比较长的计算过程来说,该方式也存在一个漏洞。如果在第一个线程正在计算的过程中,第二个线程开始获取结果,会发现缓存里没有缓存结果,因此第二个线程又启动了同样的计算,这样就导致重复计算,违背了缓存的初衷。计算过程越长,则出现这种重复计算的几率就会越大。
通过第二种方式的缺点分析,得知真正要缓存的应该是计算是否已被启动,而不是等待漫长的计算过程结束后,再缓存结果。一旦从缓存中得知某个计算过程已被其他线程启动,则当前线程不需要再重新启动计算,只需要阻塞等待计算结果的返回。FutureTask就是实现该功能的最佳选择。
?
public class Memoizer<A, V> implements Computable<A, V> {private ConcurrentMap<A, Future<V>> cache = new ConcurrentHashMap<A, Future<V>>();private final Computable<A, V> c;public Memoizer(Computable<A, V> c) {this.c = c;}@Overridepublic V compute(final A a) {while (true) {Future<V> f = cache.get(a);if (null == f) {Callable<V> eval = new Callable<V>() {@Overridepublic V call() throws Exception {return c.compute(a);}};FutureTask<V> ft = new FutureTask<V>(eval);f = cache.putIfAbsent(a, ft);if (null == f) {f = ft;ft.run();}}try {return f.get();} catch (InterruptedException e) {e.printStackTrace();cache.remove(a, f);} catch (ExecutionException e) {e.printStackTrace();}}}public static void main(String[] args) throws InterruptedException,ExecutionException {Computable<Integer, String> c = new Computable<Integer, String>() {@Overridepublic String compute(Integer a) {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}String result = "由线程:" + Thread.currentThread().getName()+ "计算得到" + a + "的结果";return result;}};Memoizer<Integer, String> memoizer = new Memoizer<Integer, String>(c);ExecutorService executorService = Executors.newFixedThreadPool(3);for (int i = 0; i < 3; i++) {Task<Integer, String> task = new Task<Integer, String>(memoizer, 10);String result = executorService.submit(task).get();System.out.println(result);}executorService.shutdown();}}
结果如下:由线程:pool-1-thread-1计算得到10的结果由线程:pool-1-thread-1计算得到10的结果由线程:pool-1-thread-1计算得到10的结果当然如果仅仅只是采用FutureTask,仅仅只是减小了启动同一个计算过程的概率。当两个线程同时经过compute方法时,还是会出现重复启动同一个计算的情况,上面的例子通过结合ConcurrentHashMap 的putIfAbsent方法解决了这个问题
?