首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > JAVA > J2SE开发 >

java多线程生产者-消费者模式有关问题请问

2013-02-18 
java多线程生产者-消费者模式问题请教现在公司项目中碰到这样一个需求,需要设计一个类似生产者-消费者的模

java多线程生产者-消费者模式问题请教
现在公司项目中碰到这样一个需求,需要设计一个类似生产者-消费者的模块,负责数据的分发。
1、入口只有一个:可能是一条一条写入,也可能是一批一批写入。目前设计了一个List暂存这些数据。
2、数据量很大,所以目前我们限制了List的大小,当达到一定容量时,会把当前的暂存List临时存到一个消费队列中,重新new一个新的暂存list继续接受数据。
3、消费者有多个,并且每个消费者都需要消费队列中List中的所有数据。每个消费者是一个线程。
4、当多个消费者线程都消费掉队列中的一个暂存list后,该暂存的list会从队列中清理。

比我们平常讨论的生产者-消费者程序复杂很多。想了很久,没想出思路,请教各位大牛,在线急等。

[解决办法]
LinkedBlockingQueue看下这个类的API
[解决办法]
数据量多大,超过十万级别么
[解决办法]

引用:
LinkedBlockingQueue看下这个类的API


+1
[解决办法]
同1楼,也觉得BlockingQueue可以

楼主,如果是考虑到内存问题才使用暂存List的话,不是等于没解决问题么?
[解决办法]
我也有类似的场景,我准备使用BlockingQueue
[解决办法]
楼主的有一点描述看不懂,为什么要“当达到一定容量时,会把当前的暂存List临时存到一个消费队列中,重新new一个新的暂存list继续接受数据。”?
数据你是可以放到多个List中,但是都是在使用内存。使用一个还是多个List,都耗费几乎同样多的内存。
也就是说你这样设计数据Keeper是画蛇添足,完全没有必要的。除非你把多的数据放到文件或者数据库中,这样设计才有意义。
你可能是担心ArrayList每次删掉前面的数据时,后面的数据会搬动。但是实际上这样老是需要在前面删除数据,在后面加入数据的应用需求,你本来就不能用ArrayList而应该用LinkedList。
[解决办法]
如果你对以上我说的没有异议,那么我可以给你出一个方案:
1、使用BlockingQueue作为数据容器。
2、数据中增加一个属性,用于保存使用过该数据的消费者
3、消费者每次都从BlockingQueue取数据,当发现该数据已经使用过了,就等待其他消费者使用完该数据
4、当最后一个消费者消费完该数据时,把这个数据从BlockingQueue中remove
[解决办法]
貌似楼主的资源一个只能被消费一次,如有资源A,B,C,如果消费者I消费了A,那就只剩下B,C,消费者II只能去消费B,C了,是这样的吗?

引用:
如果你对以上我说的没有异议,那么我可以给你出一个方案:
1、使用BlockingQueue作为数据容器。
2、数据中增加一个属性,用于保存使用过该数据的消费者
3、消费者每次都从BlockingQueue取数据,当发现该数据已经使用过了,就等待其他消费者使用完该数据
4、当最后一个消费者消费完该数据时,把这个数据从BlockingQueue中remove

[解决办法]
我好像看错了。。。

引用:
貌似楼主的资源一个只能被消费一次,如有资源A,B,C,如果消费者I消费了A,那就只剩下B,C,消费者II只能去消费B,C了,是这样的吗?


引用:
如果你对以上我说的没有异议,那么我可以给你出一个方案:
1、使用BlockingQueue作为数据容器。
2、数据中增加一个属性,用于保存使用过该数据的消费者
3、消费者每次都从BlockingQueue取数据,当发现该……

[解决办法]
我觉得楼主应该说清楚需求越详细越好。
你描述是自己的解决方案,俺们都不是很清楚的需求是什么。
一知半解
[解决办法]
写了个例子,看看合适不合适:

package csdn;

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

/**
 * data
 */
class Data{
public Data(String message){
this.message=message;
}

private String message;

public String getMessage() {
return message;
}

public void setMessage(String message) {
this.message = message;
}
}
/**
 * Data Keeper
 */
class DataKeeper<T>{
//最大队列长度
public static final int MAXQUEUELENGTH=100;
//消费者个数
public static final int MAXCONSUMER=5;



//用于存储数据的队列
private BlockingQueue<T> queue=new ArrayBlockingQueue<T>(MAXQUEUELENGTH);
//用于暂存队列保存不下的数据的容器
private List<T> tempdata=new LinkedList<T>();

//记录哪些消费者已经消费当前消息的容器
private List<Consumer> currentDataConsumer=new ArrayList<Consumer>();

//消费消息
public T getData(Consumer consumer){
if (currentDataConsumer.contains(consumer))
return null;
if (queue.isEmpty())
return null;

if (currentDataConsumer.size()==0){
System.out.println();
printStatus();
}
currentDataConsumer.add(consumer);

T d=null;
if (MAXCONSUMER==currentDataConsumer.size()){
d=queue.poll();
currentDataConsumer.clear();
copyDataFromTemp();
//System.out.println();

} else {
d=queue.peek();
}

return d;
}

//保存单条消息
public void putData(T d){
if (queue.size()<MAXQUEUELENGTH){
queue.add(d);
} else {
tempdata.add(d);
}
}

//保存多条消息
public void putData(List<T> datas){
while ((queue.size()<MAXQUEUELENGTH)&&datas.size()>0){
queue.add(datas.remove(0));
}
for (int i=0;i<datas.size();i++){
tempdata.add(datas.get(i));
}
}

private void copyDataFromTemp(){
synchronized(tempdata){
if (queue.size()<MAXQUEUELENGTH&&tempdata.size()>0){
for (int i=0;i<min(MAXQUEUELENGTH-queue.size(),tempdata.size());i++){
queue.add(tempdata.remove(i));
}
}
}
}

private int min(int n1,int n2){
return (n1>n2?n2:n1);
}

private void printStatus(){
System.out.println("queue.size()="+queue.size()+"\ttempdata.size()="+tempdata.size());
}
}

/**
 * 生产者
 */
class Producer implements Runnable {
static Random rand=new Random();
private final DataKeeper<Data> dataKeeper;

private int producttimes=0;
Producer(DataKeeper<Data> dataKeeper) {
this.dataKeeper = dataKeeper;
}

public void run() {
while (true){
producttimes++;
int count=rand.nextInt(200)+1;
List<Data> datas=new LinkedList<Data>();
for (int i=0;i<count;i++){
datas.add(new Data("第"+producttimes+"次产生的第"+(i+1)+"条数据"));
}
if (datas.size()==1){
dataKeeper.putData(datas.get(0));
} else {
dataKeeper.putData(datas);
}
//dataKeeper.printStatus();
try {
Thread.sleep(2500);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}

/**
 * 消费者
 */
class Consumer implements Runnable {
private final DataKeeper<Data> dataKeeper;
private int id;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
Consumer(DataKeeper<Data> dataKeeper,int id) {
this.dataKeeper = dataKeeper;
this.id=id;
}
@Override
public boolean equals(Object o){


if (!(o instanceof Consumer))
return false;

return this.id==((Consumer)o).id;
}

static Random rand=new Random();

public void run() {
while (true) {
Data d=dataKeeper.getData(this);
if (d!=null){
// ToDo 此处是消费代码,本演示只打印消息
System.out.print("["+id+"] ");
}

int sleep=rand.nextInt(30);
try {
Thread.sleep(sleep);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}

/**
 * 一个生产者、5个消费者
 */
public class BlockingQueueExample {
public static void main(String[] args) {
DataKeeper<Data> dataKeeper = new DataKeeper<Data>();
Producer p = new Producer(dataKeeper);
new Thread(p).start();

for (int i=1;i<=DataKeeper.MAXCONSUMER;i++){
Consumer c = new Consumer(dataKeeper,i);
new Thread(c).start();
}
}

}

热点排行