首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 软件管理 > VSTS >

寨子同步队列 VS 官方BT的ArrayBlockingQueue ,结果官方落马!

2012-10-12 
山寨同步队列 VS 官方BT的ArrayBlockingQueue ,结果官方落马!!!官方的java.util.concurrent.ArrayBlocking

山寨同步队列 VS 官方BT的ArrayBlockingQueue ,结果官方落马!!!
官方的java.util.concurrent.ArrayBlockingQueue的性能是很BT的,我下午无聊然后就想去测试下到底有多BT就写了如下测试代码,也不知道是我的代码写的有问题还是怎么的啦,测试结果和我想的完全不一样。

条件:20个线程,存取线程各半,队列大小是30W,其他电脑配置啥的啊很大众化就不描述了。

耗时:
山寨版的:2400左右
官方版的:3400左右

--------------------------------------------------------
官方的java.util.concurrent.ArrayBlockingQueue 生产者消费者代码如下:

package thread;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.BrokenBarrierException;import java.util.concurrent.CyclicBarrier;public class ProducerCustomerQueue2<E> {public static void main(String[] args) throws InterruptedException {/** * 20个线程,存取线程各半,队列大小是30W,耗时:2400左右 */int total = 300000, threadCount = 20,queueSize = 100;ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(queueSize);final long start = System.currentTimeMillis();CyclicBarrier barrier = new CyclicBarrier(threadCount, new Runnable() {public void run() {System.out.println("统计时间:"+ (System.currentTimeMillis() - start));}});for (int i = 0; i < threadCount >>> 1; i++) {new Thread(new ProducerThread2(queue, barrier, total)).start();new Thread(new CustomerThread2(queue, barrier, total)).start();}}}// 生产者线程class ProducerThread2 implements Runnable {ArrayBlockingQueue<Integer> queue;CyclicBarrier barrier;int total;public ProducerThread2(ArrayBlockingQueue<Integer> queue,CyclicBarrier barrier, int total) {super();this.queue = queue;this.barrier = barrier;this.total = total;}public void run() {System.out.println("ProducerThread启动...");for (int i = 0; i < total; i++) {try {queue.put(i);} catch (InterruptedException e) {e.printStackTrace();}}try {System.out.println("ProducerThread完毕!");barrier.await();} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}}}// 消费者线程class CustomerThread2 implements Runnable {ArrayBlockingQueue<Integer> queue;CyclicBarrier barrier;int total;public CustomerThread2(ArrayBlockingQueue<Integer> queue,CyclicBarrier barrier, int total) {super();this.queue = queue;this.barrier = barrier;this.total = total;}public void run() {System.out.println("CustomerThread启动...");for (int i = 0; i < total; i++) {Object o;try {o = queue.take();} catch (InterruptedException e) {e.printStackTrace();}}try {System.out.println("CustomerThread完毕!");barrier.await();} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}}}



自己实现的同步队列如下:
package thread;import java.util.concurrent.BrokenBarrierException;import java.util.concurrent.CyclicBarrier;public class ProducerCustomerQueue<E> {public static void main(String[] args) throws InterruptedException {/** * 20个线程,存取线程各半,队列大小是30W,耗时:2400左右 */int total = 300000, threadCount = 20, queueSize = 100;ProducerCustomerQueue<Integer> queue = new ProducerCustomerQueue<Integer>(queueSize);final long start = System.currentTimeMillis();CyclicBarrier barrier = new CyclicBarrier(threadCount, new Runnable() {public void run() {System.out.println("统计时间:"+ (System.currentTimeMillis() - start));}});for (int i = 0; i < threadCount >>> 1; i++) {new Thread(new ProducerThread(queue, barrier, total)).start();new Thread(new CustomerThread(queue, barrier, total)).start();}}private Object lock = new Object();private final E[] array;private int putIndex;private int takeIndex;private int count;public synchronized int size() {return count;}public ProducerCustomerQueue(int n) {array = (E[]) new Object[n];}/** * 将指定的元素插入到此队列的尾部(如果立即可行且不会超过该队列的容量),在成功时返回 true,如果此队列已满,则返回 false。 *  * @param o */public boolean offer(E o) {if (o == null) {throw new NullPointerException();}synchronized (lock) {if (count == array.length) {// 队列已满return false;} else {insert(o);return true;}}}/** * 将指定的元素插入此队列的尾部,如果该队列已满,则在到达指定的等待时间之前等待可用的空间。 *  * @param o * @throws InterruptedException */public boolean offer(E o, long timeout) throws InterruptedException {if (o == null) {throw new NullPointerException();}synchronized (lock) {if (count == array.length) {// 队列已满lock.wait(timeout);}if (count == array.length) {return false;} else {insert(o);}}return false;}/** * 将指定的元素插入此队列的尾部,如果该队列已满,则等待可用的空间。 *  * @param o * @throws InterruptedException */public void put(E o) throws InterruptedException {synchronized (lock) {while (count == array.length) {// 队列已满// System.out.println("put wait");lock.wait();}insert(o);}}private void insert(E o) {array[putIndex] = o;putIndex = (++putIndex == array.length) ? 0 : putIndex;count++;lock.notify();}/** * 获取并移除此队列的头,如果此队列为空,则返回 null。 */public E poll() {synchronized (lock) {if (count == 0) {// 空队列return null;}return extract();}}/** * 获取并移除此队列的头部,在指定的等待时间前等待可用的元素(如果有必要)。 *  * @throws InterruptedException */public E poll(long timeout) throws InterruptedException {synchronized (lock) {if (count == 0) {// 空队列lock.wait(timeout);}if (count == 0) {return null;} else {return extract();}}}/** * 获取并移除此队列的头部,在元素变得可用之前一直等待(如果有必要)。 *  * @return * @throws InterruptedException */public E take() throws InterruptedException {synchronized (lock) {for (;;) {if (count == 0) {// 空队列lock.wait();} else {return extract();}}}}private E extract() {E o = array[takeIndex];takeIndex = (++takeIndex == array.length) ? 0 : takeIndex;count--;lock.notify();return o;}}// 生产者线程class ProducerThread implements Runnable {ProducerCustomerQueue queue;CyclicBarrier barrier;int total;public ProducerThread(ProducerCustomerQueue queue, CyclicBarrier barrier,int total) {super();this.queue = queue;this.barrier = barrier;this.total = total;}public void run() {System.out.println("ProducerThread启动...");for (int i = 0; i < total; i++) {try {// System.out.println("ProducerThread.." + queue.size());queue.put(i);} catch (InterruptedException e) {e.printStackTrace();}}try {System.out.println("ProducerThread完毕!");barrier.await();} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}}}// 消费者线程class CustomerThread implements Runnable {ProducerCustomerQueue queue;CyclicBarrier barrier;int total;public CustomerThread(ProducerCustomerQueue queue, CyclicBarrier barrier,int total) {super();this.queue = queue;this.barrier = barrier;this.total = total;}public void run() {System.out.println("CustomerThread启动...");for (int i = 0; i < total; i++) {Object o;try {o = queue.take();} catch (InterruptedException e) {e.printStackTrace();}}try {System.out.println("CustomerThread完毕!");barrier.await();} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}}}


补充:
为什么synchronized反而比ReentrantLock还要快?这个问题终于自己找到了答案。

jdk.16下synchronized和ReentrantLock的速度差不多,似乎还快一些,但是synchronized的缺点很多 ,参考http://houlinyan.iteye.com/blog/1112535两者的对比。
jdk1.5下ReentrantLock快synchronized接近9-10倍(非专业测试仅供参考)。

热点排行