生产者消费者模式
? ? ? ? ? ? ? ?notifyAll();
? ? ? ? ? ? ? ?return buffer[pos];
?
? ? ? ? ? ?} else {
? ? ? ? ? ? ? ?// System.out.println("Buffer is Empty");
? ? ? ? ? ? ? ?try {
? ? ? ? ? ? ? ? ? ?wait();
? ? ? ? ? ? ? ?} catch (InterruptedException e) {
? ? ? ? ? ? ? ? ? ?// TODO Auto-generated catch block
? ? ? ? ? ? ? ? ? ?e.printStackTrace();
? ? ? ? ? ? ? ?}
?
? ? ? ? ? ?}
? ? ? ?return null;
?
? ?}
?
? ?/* 向环形缓冲区中放人一个元素 */
? ?public synchronized void put(Object z) {
?
? ? ? ? ? ?if (n < NMAX) {
? ? ? ? ? ? ? ?buffer[iput] = z;
? ? ? ? ? ? ? ?
? ? ? ? ? ? ? ?System.out.println("put<--" + buffer[iput]);
? ? ? ? ? ? ? ?
? ? ? ? ? ? ? ?iput = addring(iput);
? ? ? ? ? ? ? ?n++;
? ? ? ? ? ? ? ?notifyAll();
? ? ? ? ? ?} else {
? ? ? ? ? ? ? ? System.out.println("Buffer is full");
? ? ? ? ? ? ? ?try {
? ? ? ? ? ? ? ? ? ?wait();
? ? ? ? ? ? ? ? ? ?put(z);//如果满了后,重新执行
? ? ? ? ? ? ? ? ? ?System.out.println("rerun !");
? ? ? ? ? ? ? ?} catch (InterruptedException e) {
? ? ? ? ? ? ? ? ? ?// TODO Auto-generated catch block
? ? ? ? ? ? ? ? ? ?e.printStackTrace();
? ? ? ? ? ? ? ?}
? ? ? ? ? ?}
?
? ?}
?
}
?
?
-----------------------------------------------------------------
?
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
?
import org.apache.commons.lang.StringUtils;
?
/**
?* 环形缓冲区 测试
?* @author fengbin
?*
?*/
public class CircularBufTest {
?
?
CircularBuf circularBuf = new CircularBuf(2);
?
class Producer implements Runnable{
?
String apple = "";
Producer(String str){
apple = str;
}
?
@Override
public void run() {
for(int j=0;j<5;j++){
circularBuf.put(apple+j);
System.out.println("!生产:"+apple+j+"结束");
}
?
}
}
?
class Consumer implements Runnable{
?
Consumer(){
}
?
@Override
public void run() {
while(true){
String str=(String) circularBuf.get();
if(StringUtils.isEmpty(str)){
continue;
}
System.out.println("#消费:"+str+"结束");
?
try {
Thread.sleep(500);
} catch (Exception e) {
System.out.println(e);
}
}
}
?
?
}
?
public void test() {
?
ExecutorService service = Executors.newCachedThreadPool();
Producer producer = new Producer("P");
Consumer consumer = new Consumer();
service.submit(producer);
service.submit(consumer);
}
?
?
?
public static void main(String[] args) {
?
CircularBufTest test = new CircularBufTest();
test.test();
?
}
?
}
?=================================================
使用阻塞队列实现的生产者和消费者模式.
?
public class TestProducterConsumer {
?
class Producter extends Thread {
Queue q;
?
Producter(Queue q) {
this.q = q;
}
?
public void run() {
for (int i = 0; i < 10; i++) {
q.put(i);
System.out.println("producter :" + i);
}
}
}
?
class Consumer extends Thread{
Queue q;
Consumer(Queue q) {
this.q = q;
}
?
public void run() {
while (true) {
System.out.println("Consumer:" + q.get());
}
}
}
?
class Queue {
int value;
boolean bFull = false;
?
public synchronized void put(int i) {
if (!bFull) {
value = i;
bFull = true;
notify();// 必须用在synchronized
}
try {
wait();// 必须捕获异常
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
?
public synchronized int get() {
if (!bFull)
try {
wait();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
bFull = false;
notify();
return value;
}
?
}
?
public static void main(String[] args) {
TestProducterConsumer con = new TestProducterConsumer();
con.test();
?
}
?
private void test() {
Queue q = new Queue();
Producter p = new Producter(q);
Consumer c = new Consumer(q);
p.start();
c.start();
}
?
}
?