sqoop导入数据map-reduce job分析
Sqoop 导入数据
1、本质上sqoop是一个hadoop的一个jobClient,负责定义hadoop job,然后将job提交到hadoop集群,只不过这个jobClient为了支持了能通过命令行来配置各种各样的job,做了很多处理。
2、sqoop实现了各种关系型数据库(oracle,DB2,Mysql)等等<->到hadoop(hafs,hbase)的导入导出。Sqoop的导入导出,通过无reduce的map reduce job实现导入导出,每一个map导入导出其中的某一块数据。
为了实现关系型数据库到hadoop job的导入导出,关键是要写好,map-reduce job。
下面介绍下一般的mapreduce job的执行过程。
众所周知,在执行一个Job的时候,Hadoop会将输入的数据划分为,N个Split,然后启动相应的N个Map程序分别来处理它们。
1、数据如何划分?
2、Split如何调度?
3、划分后的数据又如何读取?
这是任何一个map-reduce job来解决的问题。
下图是一个
过一下,运行mapred job的流程
1、jobclient启动在单独一个jvm中,通过Job类配置的配置,定义InputFomat,outputformat,map,reduce等等一系列配置。
2、通过RPC调用,像jobTracke申请一个独一无二的JobID来标示这个Job。
3、JobCilent将Job所需要的资源提交到HDFS中也以JobID命名的目录中。这些资源包括JAR包配置文件、InputSplit等。
4、JobClient向JobTracker提交Job。
5、JobTracker初始化
6、JobTracker从hadfs获取这个Job的split等信息。
7、JobTracker向TaskTracer分配任务
8、TaskTracker从HDFS获取这个Job的相关资源。
9、TaskTracker开启一个新的JVM
10、TaskTracker用新的JVM来执行Map或Reduce。
首先是“数据如何划分”的问题。
在第3步中,JobClient向HDFS提交的资源就包含了InputSplit,这就是数据划分的结果。也就是说,数据划分是在JobClient上完成的。在这里,JobClient会使用指定的InputFormat将输入数据做一次划分,形成若干个Split
InputFormat是一个interface。用户在启动MapReduce的时候需要指定一个InputFormat的implement。InputFormat只包含了两个接口方法:
InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;RecordReader<K, V> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException;
long getLength() throws IOException;String[] getLocations() throws IOException;
public List<InputSplit> getSplits(JobContext job) throws IOException { int targetNumTasks = ConfigurationHelper.getJobNumMaps(job); if (1 == targetNumTasks) {//如果是一个map则把整个查询作为一块 // There's no need to run a bounding vals query; just return a split // that separates nothing. This can be considerably more optimal for a // large table with no index. List<InputSplit> singletonSplit = new ArrayList<InputSplit>(); singletonSplit.add(new DataDrivenDBInputSplit("1=1", "1=1")); return singletonSplit; } //得到数据库链接等信息 ResultSet results = null; Statement statement = null; Connection connection = getConnection(); try { statement = connection.createStatement(); String query = getBoundingValsQuery();//得到整体分块的上下界 LOG.info("BoundingValsQuery: " + query); results = statement.executeQuery(query); results.next(); // Based on the type of the results, use a different mechanism // for interpolating split points (i.e., numeric splits, text splits, // dates, etc.) int sqlDataType = results.getMetaData().getColumnType(1);//得到分块的基准列的类型 DBSplitter splitter = getSplitter(sqlDataType);//根据基准列类型得到分片器 if (null == splitter) { throw new IOException("Unknown SQL data type: " + sqlDataType); } return splitter.split(job.getConfiguration(), results, getDBConf().getInputOrderBy());// } catch (SQLException e) { throw new IOException(e); } finally { // More-or-less ignore SQL exceptions here, but log in case we need it. try { if (null != results) { results.close(); } } catch (SQLException se) { LOG.debug("SQLException closing resultset: " + se.toString()); } try { if (null != statement) { statement.close(); } } catch (SQLException se) { LOG.debug("SQLException closing statement: " + se.toString()); } try { connection.commit(); closeConnection(); } catch (SQLException se) { LOG.debug("SQLException committing split transaction: " + se.toString()); } } }
public List<InputSplit> split(Configuration conf, ResultSet results, String colName) throws SQLException { long minVal = results.getLong(1);//得到最大值 long maxVal = results.getLong(2); String lowClausePrefix = colName + " >= "; String highClausePrefix = colName + " < "; int numSplits = ConfigurationHelper.getConfNumMaps(conf);//得到分片总数 if (numSplits < 1) { numSplits = 1; } if (results.getString(1) == null && results.getString(2) == null){ // Range is null to null. Return a null split accordingly. List<InputSplit> splits = new ArrayList<InputSplit>(); splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit( colName + " IS NULL", colName + " IS NULL")); return splits; } // 得到所有的分片点 List<Long> splitPoints = split(numSplits, minVal, maxVal); if (LOG.isDebugEnabled()) { LOG.debug(String.format("Splits: [%,28d to %,28d] into %d parts", minVal, maxVal, numSplits)); for (int i = 0; i < splitPoints.size(); i++) { LOG.debug(String.format("%,28d", splitPoints.get(i))); } } List<InputSplit> splits = new ArrayList<InputSplit>(); // Turn the split points into a set of intervals. long start = splitPoints.get(0); //具体处理每一个split for (int i = 1; i < splitPoints.size(); i++) { long end = splitPoints.get(i); if (i == splitPoints.size() - 1) { // This is the last one; use a closed interval. splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit( lowClausePrefix + Long.toString(start), colName + " <= " + Long.toString(end))); } else { // Normal open-interval case. splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit( lowClausePrefix + Long.toString(start), highClausePrefix + Long.toString(end))); } start = end; } if (results.getString(1) == null || results.getString(2) == null) { // At least one extrema is null; add a null split. splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit( colName + " IS NULL", colName + " IS NULL")); } return splits; }
public static class DataDrivenDBInputSplit extends DBInputFormat.DBInputSplit { private String lowerBoundClause; private String upperBoundClause;
protected RecordReader<LongWritable, T> createDBRecordReader( DBInputSplit split, Configuration conf) throws IOException { DBConfiguration dbConf = getDBConf(); @SuppressWarnings("unchecked") Class<T> inputClass = (Class<T>) (dbConf.getInputClass()); String dbProductName = getDBProductName(); LOG.debug("Creating db record reader for db product: " + dbProductName); try { return new DataDrivenDBRecordReader<T>(split, inputClass, conf, getConnection(), dbConf, dbConf.getInputConditions(), dbConf.getInputFieldNames(), dbConf.getInputTableName(), dbProductName); } catch (SQLException ex) { throw new IOException(ex); } }
public abstract class SqoopRecord implements Cloneable, DBWritable, FieldMappable, Writable { public SqoopRecord() { } //parse方法的作用是从特定的格式中转化为java对象 public abstract void parse(CharSequence s) throws RecordParser.ParseError; public abstract void parse(Text s) throws RecordParser.ParseError; public abstract void parse(byte [] s) throws RecordParser.ParseError; public abstract void parse(char [] s) throws RecordParser.ParseError; public abstract void parse(ByteBuffer s) throws RecordParser.ParseError; public abstract void parse(CharBuffer s) throws RecordParser.ParseError; public abstract void loadLargeObjects(LargeObjectLoader objLoader) throws SQLException, IOException, InterruptedException; /** * Inserts the data in this object into the PreparedStatement, starting * at parameter 'offset'. * @return the number of fields written to the statement. */ public abstract int write(PreparedStatement stmt, int offset) throws SQLException; /** * Format output data according to the specified delimiters. */ public abstract String toString(DelimiterSet delimiters); /** * Use the default delimiters, but only append an end-of-record delimiter * if useRecordDelim is true. */ public String toString(boolean useRecordDelim) { // Method body should be overridden by generated classes in 1.3.0+ if (useRecordDelim) { // This is the existing functionality. return toString(); } else { // Setting this to false requires behavior in the generated class. throw new RuntimeException( "toString(useRecordDelim=false) requires a newer SqoopRecord. " + "Please regenerate your record class to use this function."); } } /** * Format the record according to the specified delimiters. An end-of-record * delimiter is optional, and only used if useRecordDelim is true. For * use with TextOutputFormat, calling this with useRecordDelim=false may * make more sense. */ public String toString(DelimiterSet delimiters, boolean useRecordDelim) { if (useRecordDelim) { return toString(delimiters); } else { // Setting this to false requires behavior in the generated class. throw new RuntimeException( "toString(delimiters, useRecordDelim=false) requires a newer " + "SqoopRecord. Please regenerate your record class to use this " + "function."); } } @Override public Object clone() throws CloneNotSupportedException { return super.clone(); } /** * Returns an integer specifying which API format version the * generated class conforms to. Used by internal APIs for backwards * compatibility. * @return the API version this class was generated against. */ public abstract int getClassFormatVersion(); /** * Use the delegate pattern to allow arbitrary processing of the * fields of this record. * @param processor A delegate that operates on this object. * @throws IOException if the processor encounters an IO error when * operating on this object. * @throws ProcessingException if the FieldMapProcessor encounters * a general processing error when operating on this object. */ public void delegate(FieldMapProcessor processor) throws IOException, ProcessingException { processor.accept(this); } @Override /** * {@inheriDoc} * @throws RuntimeException if used with a record that was generated * before this capability was added (1.1.0). */ public Map<String, Object> getFieldMap() { // Default implementation does not support field iteration. // ClassWriter should provide an overriding version. throw new RuntimeException( "Got null field map from record. Regenerate your record class."); } /** * Allows an arbitrary field to be set programmatically to the * specified value object. The value object must match the * type expected for the particular field or a RuntimeException * will result. * @throws RuntimeException if the specified field name does not exist. */ public void setField(String fieldName, Object fieldVal) { throw new RuntimeException("This SqoopRecord does not support setField(). " + "Regenerate your record class."); }}