Hadoop Core 学习笔记(一) SequenceFile文件写入和读取Writable数据
[color=blue]
刚接触Hadoop时,对SequenceFile和Writable还产生了一点联想,以为是什么神奇的东西.后来也明白,不过就是自己IO的一些协议,用于自己的输入输出.这里介绍下如何从sequence file中读出和写入Writable数据.
Writable类似传输的数据,相对于Java来说等同于对象,只是引用到Hadoop中需要一套协议去进行传输转换这个对象.于是有了里面的 public void write(DataOutput out) throws IOException 和public void readFields(DataInput in) throws IOException方法,一个怎么写入,一个怎么读取.如此这些对象才可以在整个Hadoop集群无障碍的通行.至于Hadoop为什么要另起炉灶自己搞一套序列化的东西,之前也看过一些介绍,但还没有心得,日后再慢慢领会.
所以这个例子就是自己构造一个Writable对象,然后写入到sequence file以及读出.最后将读出的数据进行对比,是否正确.具体看代码吧:
1.package com.guoyun.hadoop.io.study;
2.
3.import java.io.DataInput;
4.import java.io.DataOutput;
5.import java.io.IOException;
6.import java.util.ArrayList;
7.import java.util.Collection;
8.import java.util.HashSet;
9.import java.util.List;
10.import java.util.Set;
11.
12.import org.apache.hadoop.conf.Configuration;
13.import org.apache.hadoop.fs.FileSystem;
14.import org.apache.hadoop.fs.Path;
15.import org.apache.hadoop.io.IOUtils;
16.import org.apache.hadoop.io.LongWritable;
17.import org.apache.hadoop.io.SequenceFile;
18.import org.apache.hadoop.io.Writable;
19.import org.apache.hadoop.util.ReflectionUtils;
20.
21.public class SequenceFileStudy {
22.
23. public static class UserWritable implements Writable,Comparable{
24. private long userId;
25. private String userName;
26. private int userAge;
27.
28.
29. public long getUserId() {
30. return userId;
31. }
32.
33. public void setUserId(long userId) {
34. this.userId = userId;
35. }
36.
37. public String getUserName() {
38. return userName;
39. }
40.
41. public void setUserName(String userName) {
42. this.userName = userName;
43. }
44.
45. public int getUserAge() {
46. return userAge;
47. }
48.
49. public void setUserAge(int userAge) {
50. this.userAge = userAge;
51. }
52.
53. public UserWritable(long userId, String userName, int userAge) {
54. super();
55. this.userId = userId;
56. this.userName = userName;
57. this.userAge = userAge;
58. }
59.
60. public UserWritable() {
61. super();
62. }
63.
64. @Override
65. public void write(DataOutput out) throws IOException {
66. out.writeLong(this.userId);
67. out.writeUTF(this.userName);
68. out.writeInt(this.userAge);
69. }
70.
71. @Override
72. public void readFields(DataInput in) throws IOException {
73. this.userId=in.readLong();
74. this.userName=in.readUTF();
75. this.userAge=in.readInt();
76. }
77.
78. @Override
79. public String toString() {
80. return this.userId+"\t"+this.userName+"\t"+this.userAge;
81. }
82.
83. /**
84. * 只对比userId
85. */
86. @Override
87. public boolean equals(Object obj) {
88. if(obj instanceof UserWritable){
89. UserWritable u1=(UserWritable)obj;
90. return u1.getUserId()==this.getUserId();
91. }
92. return false;
93. }
94.
95. /**
96. * 只对比userId
97. */
98. @Override
99. public int compareTo(Object obj) {
100. int result=-1;
101. if(obj instanceof UserWritable){
102. UserWritable u1=(UserWritable)obj;
103. if(this.userId>u1.userId){
104. result=1;
105. }else if(this.userId==u1.userId){
106. result=1;
107. }
108. }
109. return result;
110. }
111.
112. @Override
113. public int hashCode() {
114. return (int)this.userId&Integer.MAX_VALUE;
115. }
116.
117. }
118.
119. /**
120. * 写入到sequence file
121. *
122. * @param filePath
123. * @param conf
124. * @param datas
125. */
126. public static void write2SequenceFile(String filePath,Configuration conf,Collection<UserWritable> datas){
127. FileSystem fs=null;
128. SequenceFile.Writer writer=null;
129. Path path=null;
130. LongWritable idKey=new LongWritable(0);
131.
132. try {
133. fs=FileSystem.get(conf);
134. path=new Path(filePath);
135. writer=SequenceFile.createWriter(fs, conf, path, LongWritable.class, UserWritable.class);
136.
137. for(UserWritable user:datas){
138. idKey.set(user.getUserId()); // userID为Key
139. writer.append(idKey, user);
140. }
141. } catch (IOException e) {
142. // TODO Auto-generated catch block
143. e.printStackTrace();
144. }finally{
145. IOUtils.closeStream(writer);
146. }
147. }
148.
149. /**
150. * 从sequence file文件中读取数据
151. *
152. * @param sequeceFilePath
153. * @param conf
154. * @return
155. */
156. public static List<UserWritable> readSequenceFile(String sequeceFilePath,Configuration conf){
157. List<UserWritable> result=null;
158. FileSystem fs=null;
159. SequenceFile.Reader reader=null;
160. Path path=null;
161. Writable key=null;
162. UserWritable value=new UserWritable();
163.
164. try {
165. fs=FileSystem.get(conf);
166. result=new ArrayList<UserWritable>();
167. path=new Path(sequeceFilePath);
168. reader=new SequenceFile.Reader(fs, path, conf);
169. key=(Writable)ReflectionUtils.newInstance(reader.getKeyClass(), conf); // 获得Key,也就是之前写入的userId
170. while(reader.next(key, value)){
171. result.add(value);
172. value=new UserWritable();
173. }
174.
175. } catch (IOException e) {
176. // TODO Auto-generated catch block
177. e.printStackTrace();
178. }catch (Exception e){
179. e.printStackTrace();
180. }finally{
181. IOUtils.closeStream(reader);
182. }
183. return result;
184. }
185.
186. private static Configuration getDefaultConf(){
187. Configuration conf=new Configuration();
188. conf.set("mapred.job.tracker", "local");
189. conf.set("fs.default.name", "file:///");
190. //conf.set("io.compression.codecs", "com.hadoop.compression.lzo.LzoCodec");
191. return conf;
192. }
193.
194. /**
195. * @param args
196. */
197. public static void main(String[] args) {
198. String filePath="data/user.sequence"; // 文件路径
199. Set<UserWritable> users=new HashSet<UserWritable>();
200. UserWritable user=null;
201. // 生成数据
202. for(int i=1;i<=10;i++){
203. user=new UserWritable(i+(int)(Math.random()*100000),"name-"+(i+1),(int)(Math.random()*50)+10);
204. users.add(user);
205. }
206. // 写入到sequence file
207. write2SequenceFile(filePath,getDefaultConf(),users);
208. //从sequence file中读取
209. List<UserWritable> readDatas=readSequenceFile(filePath,getDefaultConf());
210.
211. // 对比数据是否正确并输出
212. for(UserWritable u:readDatas){
213. if(users.contains(u)){
214. System.out.println(u.toString());
215. }else{
216. System.err.println("Error data:"+u.toString());
217. }
218. }
219.
220. }
221.
222.} [/color]