hama学习W记Q?Q?peers之间通信速度试
昨天下午对HDFS的?度进行了试Q晚上又对Hama的peer间?信?信?度进行了试?/p>
[转蝲引用h明出处:http://blog.csdn.net/bhq2010/article/details/8741647]
软硬件环境:和之前的hdfs试中用的是?的:http://blog.csdn.net/bhq2010/article/details/8740154
hama安裑的是0.6.0版本?/p>
在setupҎ中?Z个master task作ؓ主peer
在bsp中写?个超步,W一个超步读取本地的文gQ并其?分发送给master
master在第二个步中接收到其他peer发过来的消息Q数据)Q将其写入本地的文g中?
有一炚w要注意,应该讑֮bspTask个数为集中节点的个敎ͼ这样通常每个节点上会有且仅有?bspd。少了会使得有些节点上没有bspdQE了会使得?节点上的多个bspd同时d?文gQ然后就挂掉了?
试E序如下Q?/p>
import java.io.BufferedReader;import java.io.BufferedWriter;import java.io.File;import java.io.FileReader;import java.io.FileWriter;import java.io.IOException;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hama.HamaConfiguration;import org.apache.hama.bsp.BSP;import org.apache.hama.bsp.BSPJob;import org.apache.hama.bsp.BSPJobClient;import org.apache.hama.bsp.BSPPeer;import org.apache.hama.bsp.ClusterStatus;import org.apache.hama.bsp.FileOutputFormat;import org.apache.hama.bsp.NullInputFormat;import org.apache.hama.bsp.TextOutputFormat;import org.apache.hama.bsp.sync.SyncException;public class HamaTest{private static Path TMP_OUTPUT = new Path("/tmp/pi-"+ System.currentTimeMillis());public static class CommunicationTest extendsBSP<NullWritable, NullWritable, Text, Text, Text>{public static final Log LOG = LogFactory.getLog(CommunicationTest.class);private String masterTask;@Overridepublic void bsp(BSPPeer<NullWritable, NullWritable, Text, Text, Text> peer)throws IOException, SyncException, InterruptedException{File f = new File("/data/external_links_en.nt");if (f.exists()){int i = 0;FileReader fr = new FileReader("/data/external_links_en.nt");BufferedReader reader = new BufferedReader(fr);String line = null;while ((line = reader.readLine()) != null){i++;if (i > 661700){break;}peer.send(masterTask, new Text(line));}reader.close();}peer.sync();if (peer.getPeerName().equals(masterTask)){Text received;FileWriter fw = new FileWriter("/data/tmpres");BufferedWriter writer = new BufferedWriter(fw);while ((received = peer.getCurrentMessage()) != null){writer.write(received.toString() + "\n");}writer.close();}peer.sync();}@Overridepublic void setup(BSPPeer<NullWritable, NullWritable, Text, Text, Text> peer)throws IOException{// Choose one as a masterString[] allPeerNames = peer.getAllPeerNames();int port = 0;for (String peerName : allPeerNames){if (peerName.split(":")[0].equals("iir455-200")){if (port == 0|| Integer.parseInt(peerName.split(":")[1]) < port){port = Integer.parseInt(peerName.split(":")[1]);masterTask = peerName;}}}try{peer.sync();} catch (SyncException e){e.printStackTrace();} catch (InterruptedException e){e.printStackTrace();}}@Overridepublic void cleanup(BSPPeer<NullWritable, NullWritable, Text, Text, Text> peer)throws IOException{}}public static void main(String[] args) throws InterruptedException,IOException, ClassNotFoundException{HamaConfiguration conf = new HamaConfiguration();BSPJob bsp = new BSPJob(conf, HamaTest.class);bsp.setJobName("Connection Speed Test");bsp.setBspClass(CommunicationTest.class);bsp.setInputFormat(NullInputFormat.class);bsp.setOutputKeyClass(Text.class);bsp.setOutputValueClass(Text.class);bsp.setOutputFormat(TextOutputFormat.class);FileOutputFormat.setOutputPath(bsp, TMP_OUTPUT);BSPJobClient jobClient = new BSPJobClient(conf);ClusterStatus cluster = jobClient.getClusterStatus(true);if (args.length > 0){bsp.setNumBspTask(Integer.parseInt(args[0]));} else{bsp.setNumBspTask(cluster.getMaxTasks());}long startTime = System.currentTimeMillis();if (bsp.waitForCompletion(true)){System.out.println("Job Finished in "+ (System.currentTimeMillis() - startTime) / 1000.0+ " seconds");}}}
如果Ҏ只发?6170行,则用时在20U左叻I可旉消?主覠在?信上?/p>结Q?/h1>
Hama的peer之间通信速度和健壮?都不理想Q?/p>
1、从六个节点向一个节点传410MB的消息居然h均用?10U,L启动d的大U?0U钟Q其qn的传输?度只?MB/sQ?/p>
2、非常吃内存Q剩余将?GB的内存,f然跑一个几癑օ通信量的Job׃报内存不I当然这也可能是Hama配置的问题,睡完觉了再查查文档;
?还是不覠用hama本n的同步?信功能传递大量的数据Q它只?合在同步计算过程中发送少量的消息?/p>