java多线程生产者-消费者模式问题请教
现在公司项目中碰到这样一个需求,需要设计一个类似生产者-消费者的模块,负责数据的分发。
1、入口只有一个:可能是一条一条写入,也可能是一批一批写入。目前设计了一个List暂存这些数据。
2、数据量很大,所以目前我们限制了List的大小,当达到一定容量时,会把当前的暂存List临时存到一个消费队列中,重新new一个新的暂存list继续接受数据。
3、消费者有多个,并且每个消费者都需要消费队列中List中的所有数据。每个消费者是一个线程。
4、当多个消费者线程都消费掉队列中的一个暂存list后,该暂存的list会从队列中清理。
比我们平常讨论的生产者-消费者程序复杂很多。想了很久,没想出思路,请教各位大牛,在线急等。
[解决办法]
LinkedBlockingQueue看下这个类的API
[解决办法]
数据量多大,超过十万级别么
[解决办法]
//用于存储数据的队列
private BlockingQueue<T> queue=new ArrayBlockingQueue<T>(MAXQUEUELENGTH);
//用于暂存队列保存不下的数据的容器
private List<T> tempdata=new LinkedList<T>();
//记录哪些消费者已经消费当前消息的容器
private List<Consumer> currentDataConsumer=new ArrayList<Consumer>();
//消费消息
public T getData(Consumer consumer){
if (currentDataConsumer.contains(consumer))
return null;
if (queue.isEmpty())
return null;
if (currentDataConsumer.size()==0){
System.out.println();
printStatus();
}
currentDataConsumer.add(consumer);
T d=null;
if (MAXCONSUMER==currentDataConsumer.size()){
d=queue.poll();
currentDataConsumer.clear();
copyDataFromTemp();
//System.out.println();
} else {
d=queue.peek();
}
return d;
}
//保存单条消息
public void putData(T d){
if (queue.size()<MAXQUEUELENGTH){
queue.add(d);
} else {
tempdata.add(d);
}
}
//保存多条消息
public void putData(List<T> datas){
while ((queue.size()<MAXQUEUELENGTH)&&datas.size()>0){
queue.add(datas.remove(0));
}
for (int i=0;i<datas.size();i++){
tempdata.add(datas.get(i));
}
}
private void copyDataFromTemp(){
synchronized(tempdata){
if (queue.size()<MAXQUEUELENGTH&&tempdata.size()>0){
for (int i=0;i<min(MAXQUEUELENGTH-queue.size(),tempdata.size());i++){
queue.add(tempdata.remove(i));
}
}
}
}
private int min(int n1,int n2){
return (n1>n2?n2:n1);
}
private void printStatus(){
System.out.println("queue.size()="+queue.size()+"\ttempdata.size()="+tempdata.size());
}
}
/**
* 生产者
*/
class Producer implements Runnable {
static Random rand=new Random();
private final DataKeeper<Data> dataKeeper;
private int producttimes=0;
Producer(DataKeeper<Data> dataKeeper) {
this.dataKeeper = dataKeeper;
}
public void run() {
while (true){
producttimes++;
int count=rand.nextInt(200)+1;
List<Data> datas=new LinkedList<Data>();
for (int i=0;i<count;i++){
datas.add(new Data("第"+producttimes+"次产生的第"+(i+1)+"条数据"));
}
if (datas.size()==1){
dataKeeper.putData(datas.get(0));
} else {
dataKeeper.putData(datas);
}
//dataKeeper.printStatus();
try {
Thread.sleep(2500);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
/**
* 消费者
*/
class Consumer implements Runnable {
private final DataKeeper<Data> dataKeeper;
private int id;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
Consumer(DataKeeper<Data> dataKeeper,int id) {
this.dataKeeper = dataKeeper;
this.id=id;
}
@Override
public boolean equals(Object o){
if (!(o instanceof Consumer))
return false;
return this.id==((Consumer)o).id;
}
static Random rand=new Random();
public void run() {
while (true) {
Data d=dataKeeper.getData(this);
if (d!=null){
// ToDo 此处是消费代码,本演示只打印消息
System.out.print("["+id+"] ");
}
int sleep=rand.nextInt(30);
try {
Thread.sleep(sleep);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
/**
* 一个生产者、5个消费者
*/
public class BlockingQueueExample {
public static void main(String[] args) {
DataKeeper<Data> dataKeeper = new DataKeeper<Data>();
Producer p = new Producer(dataKeeper);
new Thread(p).start();
for (int i=1;i<=DataKeeper.MAXCONSUMER;i++){
Consumer c = new Consumer(dataKeeper,i);
new Thread(c).start();
}
}
}