首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 其他教程 > 开源软件 >

sqoop导入数据地图-reduce job分析

2012-06-28 
sqoop导入数据map-reduce job分析Sqoop 导入数据1、本质上sqoop是一个hadoop的一个jobClient,负责定义hadoo

sqoop导入数据map-reduce job分析
Sqoop 导入数据
1、本质上sqoop是一个hadoop的一个jobClient,负责定义hadoop job,然后将job提交到hadoop集群,只不过这个jobClient为了支持了能通过命令行来配置各种各样的job,做了很多处理。
2、sqoop实现了各种关系型数据库(oracle,DB2,Mysql)等等<->到hadoop(hafs,hbase)的导入导出。Sqoop的导入导出,通过无reduce的map reduce job实现导入导出,每一个map导入导出其中的某一块数据。
为了实现关系型数据库到hadoop job的导入导出,关键是要写好,map-reduce job。
下面介绍下一般的mapreduce job的执行过程。
众所周知,在执行一个Job的时候,Hadoop会将输入的数据划分为,N个Split,然后启动相应的N个Map程序分别来处理它们。
1、数据如何划分?
2、Split如何调度?
3、划分后的数据又如何读取?
这是任何一个map-reduce job来解决的问题。
下图是一个

过一下,运行mapred job的流程
1、jobclient启动在单独一个jvm中,通过Job类配置的配置,定义InputFomat,outputformat,map,reduce等等一系列配置。
2、通过RPC调用,像jobTracke申请一个独一无二的JobID来标示这个Job。
3、JobCilent将Job所需要的资源提交到HDFS中也以JobID命名的目录中。这些资源包括JAR包配置文件、InputSplit等。
4、JobClient向JobTracker提交Job。
5、JobTracker初始化
6、JobTracker从hadfs获取这个Job的split等信息。
7、JobTracker向TaskTracer分配任务
8、TaskTracker从HDFS获取这个Job的相关资源。
9、TaskTracker开启一个新的JVM
10、TaskTracker用新的JVM来执行Map或Reduce。

首先是“数据如何划分”的问题。
在第3步中,JobClient向HDFS提交的资源就包含了InputSplit,这就是数据划分的结果。也就是说,数据划分是在JobClient上完成的。在这里,JobClient会使用指定的InputFormat将输入数据做一次划分,形成若干个Split
InputFormat是一个interface。用户在启动MapReduce的时候需要指定一个InputFormat的implement。InputFormat只包含了两个接口方法:

InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;RecordReader<K, V> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException;

getSplits就是现在要使用的划分函数。job参数是任务的配置集合,从中可以取到用户在启动MapReduce时指定的输入文件路径。而numSplits参数是一个Split数目的建议值,是否考虑这个值,由具体的InputFormat实现来决定。
返回的是InputSplit数组,它描述了所有的Split信息,一个InputSplit描述一个Split(分块)。

InputSplit也是一个interface,具体返回什么样的implement,这是由具体的InputFormat来决定的。InputSplit也只有两个接口函数:
long getLength() throws IOException;String[] getLocations() throws IOException;

这 个interface仅仅描述了Split有多长,以及存放这个Split的Location信息(也就是这个Split在HDFS上存放的机器。它可能 有多个replication,存在于多台机器上)。除此之外,就再没有任何直接描述Split的信息了。比如:Split对应于哪个文件?在文件中的起 始和结束位置是什么?等等重要的特征都没有描述到。
为什么会这样呢?因为关于Split的那些描述信息,对于MapReduce框架来说是不需要关心的。框架只关心Split的长度(主要用于一些统计信息)和Split的Location(主要用于Split的调度,后面会细说)。
而 Split中真正重要的描述信息还是只有InputFormat会关心。在需要读取一个Split的时候,其对应的InputSplit会被传递到 InputFormat的第二个接口函数getRecordReader,然后被用于初始化一个RecordReader,以解析输入数据。也就是说,描 述Split的重要信息都被隐藏了,只有具体的InputFormat自己知道。它只需要保证getSplits返回的InputSplit和 getRecordReader所关心的InputSplit是同样的implement就行了。这就给InputFormat的实现提供了巨大的灵活性。
我们通过sqoop import job来展开说起,
SQOOP import features
1、Divide table into ranges using primary key max/min
2、Create mappers for each range
3、Mappers write to multiple HDFS nodes
4、Creates text or sequence files
5、Generates Java class for resulting HDFS file
6、Generates Hive definition and auto-loads into HIVE
对于我们sqoop InputFomat 我们从一个sqoop的一个具体的InputFormat说起,
继承结构
DataDrivenDBInputFormat extends DBInputFormat extends InputFormat
作用是从关系型数据库中导入数据到HDFS上,
基本思想:
关系型数据库是面向行的数据库,查询出来的无非是关系的集合,而我们导出数据,就需要,需要分布式地导出,需要把导出记录的集合分块,如何平均地把数据库分块信息,能否平均地分配到各个map当中是sqoop的关键。

DataDrivenDBInputFormat
首先看它的函数:
getsplits()
public List<InputSplit> getSplits(JobContext job) throws IOException {    int targetNumTasks = ConfigurationHelper.getJobNumMaps(job);    if (1 == targetNumTasks) {//如果是一个map则把整个查询作为一块      // There's no need to run a bounding vals query; just return a split      // that separates nothing. This can be considerably more optimal for a      // large table with no index.      List<InputSplit> singletonSplit = new ArrayList<InputSplit>();      singletonSplit.add(new DataDrivenDBInputSplit("1=1", "1=1"));      return singletonSplit;    }    //得到数据库链接等信息    ResultSet results = null;    Statement statement = null;    Connection connection = getConnection();    try {      statement = connection.createStatement();      String query = getBoundingValsQuery();//得到整体分块的上下界      LOG.info("BoundingValsQuery: " + query);      results = statement.executeQuery(query);      results.next();      // Based on the type of the results, use a different mechanism      // for interpolating split points (i.e., numeric splits, text splits,      // dates, etc.)      int sqlDataType = results.getMetaData().getColumnType(1);//得到分块的基准列的类型      DBSplitter splitter = getSplitter(sqlDataType);//根据基准列类型得到分片器      if (null == splitter) {        throw new IOException("Unknown SQL data type: " + sqlDataType);      }      return splitter.split(job.getConfiguration(), results,          getDBConf().getInputOrderBy());//    } catch (SQLException e) {      throw new IOException(e);    } finally {      // More-or-less ignore SQL exceptions here, but log in case we need it.      try {        if (null != results) {          results.close();        }      } catch (SQLException se) {        LOG.debug("SQLException closing resultset: " + se.toString());      }      try {        if (null != statement) {          statement.close();        }      } catch (SQLException se) {        LOG.debug("SQLException closing statement: " + se.toString());      }      try {        connection.commit();        closeConnection();      } catch (SQLException se) {        LOG.debug("SQLException committing split transaction: "            + se.toString());      }    }  }

具体看spliter:
public List<InputSplit> split(Configuration conf, ResultSet results,      String colName) throws SQLException {    long minVal = results.getLong(1);//得到最大值    long maxVal = results.getLong(2);    String lowClausePrefix = colName + " >= ";    String highClausePrefix = colName + " < ";    int numSplits = ConfigurationHelper.getConfNumMaps(conf);//得到分片总数    if (numSplits < 1) {      numSplits = 1;    }    if (results.getString(1) == null && results.getString(2) == null){      // Range is null to null. Return a null split accordingly.      List<InputSplit> splits = new ArrayList<InputSplit>();      splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(          colName + " IS NULL", colName + " IS NULL"));      return splits;    }    // 得到所有的分片点    List<Long> splitPoints = split(numSplits, minVal, maxVal);    if (LOG.isDebugEnabled()) {      LOG.debug(String.format("Splits: [%,28d to %,28d] into %d parts",          minVal, maxVal, numSplits));      for (int i = 0; i < splitPoints.size(); i++) {        LOG.debug(String.format("%,28d", splitPoints.get(i)));      }    }    List<InputSplit> splits = new ArrayList<InputSplit>();          // Turn the split points into a set of intervals.    long start = splitPoints.get(0);   //具体处理每一个split    for (int i = 1; i < splitPoints.size(); i++) {      long end = splitPoints.get(i);      if (i == splitPoints.size() - 1) {        // This is the last one; use a closed interval.        splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(            lowClausePrefix + Long.toString(start),            colName + " <= " + Long.toString(end)));      } else {        // Normal open-interval case.        splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(            lowClausePrefix + Long.toString(start),            highClausePrefix + Long.toString(end)));      }      start = end;    }    if (results.getString(1) == null || results.getString(2) == null) {      // At least one extrema is null; add a null split.      splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(          colName + " IS NULL", colName + " IS NULL"));    }    return splits;  }

再看看分片的结构:
public static class DataDrivenDBInputSplit      extends DBInputFormat.DBInputSplit {    private String lowerBoundClause;    private String upperBoundClause;

在这里只列出来两个域lowerBoundClause,upperBoundClause,每一个分片由上界where字句和下界where字句限制。


下面再看另一个重要的方法:createDBRecordReader
protected RecordReader<LongWritable, T> createDBRecordReader(      DBInputSplit split, Configuration conf) throws IOException {    DBConfiguration dbConf = getDBConf();    @SuppressWarnings("unchecked")    Class<T> inputClass = (Class<T>) (dbConf.getInputClass());    String dbProductName = getDBProductName();    LOG.debug("Creating db record reader for db product: " + dbProductName);    try {      return new DataDrivenDBRecordReader<T>(split, inputClass,          conf, getConnection(), dbConf, dbConf.getInputConditions(),          dbConf.getInputFieldNames(), dbConf.getInputTableName(),          dbProductName);    } catch (SQLException ex) {      throw new IOException(ex);    }  }

最后返回一个具体DataDrivenDBRecordReader实现RecordReader类,该DataDrivenDBRecordReader,把一个long类型的变量作为key,把一条行记录作为一个Value作为Value,传入map函数进行处理。
这里面有一个小的trick,怎样把关系型数据库的一条记录,映射为java的一个对象呢,由于具体查询哪条数据库哪个字段是不确定的,所以我们需要根据查询的字段动态生成一个一条记录的对应的java对象,即继承SqoopRecord的java对象,
SqoopRecord的类如下所示:
public abstract class SqoopRecord implements Cloneable, DBWritable,    FieldMappable, Writable {  public SqoopRecord() {  }  //parse方法的作用是从特定的格式中转化为java对象  public abstract void parse(CharSequence s) throws RecordParser.ParseError;  public abstract void parse(Text s) throws RecordParser.ParseError;  public abstract void parse(byte [] s) throws RecordParser.ParseError;  public abstract void parse(char [] s) throws RecordParser.ParseError;  public abstract void parse(ByteBuffer s) throws RecordParser.ParseError;  public abstract void parse(CharBuffer s) throws RecordParser.ParseError;  public abstract void loadLargeObjects(LargeObjectLoader objLoader)      throws SQLException, IOException, InterruptedException;  /**   * Inserts the data in this object into the PreparedStatement, starting   * at parameter 'offset'.   * @return the number of fields written to the statement.   */  public abstract int write(PreparedStatement stmt, int offset)      throws SQLException;  /**   * Format output data according to the specified delimiters.   */  public abstract String toString(DelimiterSet delimiters);  /**   * Use the default delimiters, but only append an end-of-record delimiter   * if useRecordDelim is true.   */  public String toString(boolean useRecordDelim) {    // Method body should be overridden by generated classes in 1.3.0+    if (useRecordDelim) {      // This is the existing functionality.      return toString();    } else {      // Setting this to false requires behavior in the generated class.      throw new RuntimeException(          "toString(useRecordDelim=false) requires a newer SqoopRecord. "          + "Please regenerate your record class to use this function.");    }  }  /**   * Format the record according to the specified delimiters. An end-of-record   * delimiter is optional, and only used if useRecordDelim is true. For   * use with TextOutputFormat, calling this with useRecordDelim=false may   * make more sense.   */  public String toString(DelimiterSet delimiters, boolean useRecordDelim) {    if (useRecordDelim) {      return toString(delimiters);    } else {      // Setting this to false requires behavior in the generated class.      throw new RuntimeException(          "toString(delimiters, useRecordDelim=false) requires a newer "          + "SqoopRecord. Please regenerate your record class to use this "          + "function.");    }  }  @Override  public Object clone() throws CloneNotSupportedException {    return super.clone();  }  /**   * Returns an integer specifying which API format version the   * generated class conforms to. Used by internal APIs for backwards   * compatibility.   * @return the API version this class was generated against.   */  public abstract int getClassFormatVersion();  /**   * Use the delegate pattern to allow arbitrary processing of the   * fields of this record.   * @param processor A delegate that operates on this object.   * @throws IOException if the processor encounters an IO error when   * operating on this object.   * @throws ProcessingException if the FieldMapProcessor encounters   * a general processing error when operating on this object.   */  public void delegate(FieldMapProcessor processor)      throws IOException, ProcessingException {    processor.accept(this);  }  @Override  /**   * {@inheriDoc}   * @throws RuntimeException if used with a record that was generated   * before this capability was added (1.1.0).   */  public Map<String, Object> getFieldMap() {    // Default implementation does not support field iteration.    // ClassWriter should provide an overriding version.  throw new RuntimeException(      "Got null field map from record. Regenerate your record class.");  }  /**   * Allows an arbitrary field to be set programmatically to the   * specified value object. The value object must match the   * type expected for the particular field or a RuntimeException   * will result.   * @throws RuntimeException if the specified field name does not exist.   */  public void setField(String fieldName, Object fieldVal) {    throw new RuntimeException("This SqoopRecord does not support setField(). "        + "Regenerate your record class.");  }}

sqoop为我们生成继承SqoopRecord的类的source code,然后再编译打包,最后放入classpath中,然后指定该类为inputclass
至此,我们的map-reducejob算是弄完,具体的map方法很简单,需要需要从sqoopRecord把值取出来,然后写入hdfs即可。

热点排行