DBInputFormat.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:11k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred.lib.db;
  19. import java.io.DataInput;
  20. import java.io.DataOutput;
  21. import java.io.IOException;
  22. import java.sql.Connection;
  23. import java.sql.PreparedStatement;
  24. import java.sql.ResultSet;
  25. import java.sql.SQLException;
  26. import java.sql.Statement;
  27. import org.apache.hadoop.io.LongWritable;
  28. import org.apache.hadoop.io.Writable;
  29. import org.apache.hadoop.mapred.InputFormat;
  30. import org.apache.hadoop.mapred.InputSplit;
  31. import org.apache.hadoop.mapred.JobConf;
  32. import org.apache.hadoop.mapred.JobConfigurable;
  33. import org.apache.hadoop.mapred.RecordReader;
  34. import org.apache.hadoop.mapred.Reporter;
  35. import org.apache.hadoop.util.ReflectionUtils;
  36. /**
  37.  * A InputFormat that reads input data from an SQL table.
  38.  * <p>
  39.  * DBInputFormat emits LongWritables containing the record number as 
  40.  * key and DBWritables as value. 
  41.  * 
  42.  * The SQL query, and input class can be using one of the two 
  43.  * setInput methods.
  44.  */
  45. public class DBInputFormat<T  extends DBWritable>
  46.   implements InputFormat<LongWritable, T>, JobConfigurable {
  47.   /**
  48.    * A RecordReader that reads records from a SQL table.
  49.    * Emits LongWritables containing the record number as 
  50.    * key and DBWritables as value.  
  51.    */
  52.   protected class DBRecordReader implements
  53.   RecordReader<LongWritable, T> {
  54.     private ResultSet results;
  55.     private Statement statement;
  56.     private Class<T> inputClass;
  57.     private JobConf job;
  58.     private DBInputSplit split;
  59.     private long pos = 0;
  60.     /**
  61.      * @param split The InputSplit to read data for
  62.      * @throws SQLException 
  63.      */
  64.     protected DBRecordReader(DBInputSplit split, Class<T> inputClass, JobConf job) throws SQLException {
  65.       this.inputClass = inputClass;
  66.       this.split = split;
  67.       this.job = job;
  68.       
  69.       statement = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
  70.       //statement.setFetchSize(Integer.MIN_VALUE);
  71.       results = statement.executeQuery(getSelectQuery());
  72.     }
  73.     /** Returns the query for selecting the records, 
  74.      * subclasses can override this for custom behaviour.*/
  75.     protected String getSelectQuery() {
  76.       StringBuilder query = new StringBuilder();
  77.       
  78.       if(dbConf.getInputQuery() == null) {
  79.         query.append("SELECT ");
  80.         for (int i = 0; i < fieldNames.length; i++) {
  81.           query.append(fieldNames[i]);
  82.           if(i != fieldNames.length -1) {
  83.             query.append(", ");
  84.           }
  85.         }
  86.         query.append(" FROM ").append(tableName);
  87.         query.append(" AS ").append(tableName); //in hsqldb this is necessary
  88.         if (conditions != null && conditions.length() > 0)
  89.           query.append(" WHERE (").append(conditions).append(")");
  90.         String orderBy = dbConf.getInputOrderBy();
  91.         if(orderBy != null && orderBy.length() > 0) {
  92.           query.append(" ORDER BY ").append(orderBy);
  93.         }
  94.       }
  95.       else {
  96.         query.append(dbConf.getInputQuery());
  97.       }
  98.       try {
  99.         query.append(" LIMIT ").append(split.getLength());
  100.         query.append(" OFFSET ").append(split.getStart());
  101.       }
  102.       catch (IOException ex) {
  103.         //ignore, will not throw
  104.       }
  105.       return query.toString();
  106.     }
  107.     /** {@inheritDoc} */
  108.     public void close() throws IOException {
  109.       try {
  110.         connection.commit();
  111.         results.close();
  112.         statement.close();
  113.       } catch (SQLException e) {
  114.         throw new IOException(e.getMessage());
  115.       }
  116.     }
  117.     /** {@inheritDoc} */
  118.     public LongWritable createKey() {
  119.       return new LongWritable();  
  120.     }
  121.     /** {@inheritDoc} */
  122.     public T createValue() {
  123.       return ReflectionUtils.newInstance(inputClass, job);
  124.     }
  125.     /** {@inheritDoc} */
  126.     public long getPos() throws IOException {
  127.       return pos;
  128.     }
  129.     /** {@inheritDoc} */
  130.     public float getProgress() throws IOException {
  131.       return pos / (float)split.getLength();
  132.     }
  133.     /** {@inheritDoc} */
  134.     public boolean next(LongWritable key, T value) throws IOException {
  135.       try {
  136.         if (!results.next())
  137.           return false;
  138.         // Set the key field value as the output key value
  139.         key.set(pos + split.getStart());
  140.         value.readFields(results);
  141.         pos ++;
  142.       } catch (SQLException e) {
  143.         throw new IOException(e.getMessage());
  144.       }
  145.       return true;
  146.     }
  147.   }
  148.   /**
  149.    * A Class that does nothing, implementing DBWritable
  150.    */
  151.   public static class NullDBWritable implements DBWritable, Writable {
  152.     @Override
  153.     public void readFields(DataInput in) throws IOException { }
  154.     @Override
  155.     public void readFields(ResultSet arg0) throws SQLException { }
  156.     @Override
  157.     public void write(DataOutput out) throws IOException { }
  158.     @Override
  159.     public void write(PreparedStatement arg0) throws SQLException { }
  160.   }
  161.   /**
  162.    * A InputSplit that spans a set of rows
  163.    */
  164.   protected static class DBInputSplit implements InputSplit {
  165.     private long end = 0;
  166.     private long start = 0;
  167.     /**
  168.      * Default Constructor
  169.      */
  170.     public DBInputSplit() {
  171.     }
  172.     /**
  173.      * Convenience Constructor
  174.      * @param start the index of the first row to select
  175.      * @param end the index of the last row to select
  176.      */
  177.     public DBInputSplit(long start, long end) {
  178.       this.start = start;
  179.       this.end = end;
  180.     }
  181.     /** {@inheritDoc} */
  182.     public String[] getLocations() throws IOException {
  183.       // TODO Add a layer to enable SQL "sharding" and support locality
  184.       return new String[] {};
  185.     }
  186.     /**
  187.      * @return The index of the first row to select
  188.      */
  189.     public long getStart() {
  190.       return start;
  191.     }
  192.     /**
  193.      * @return The index of the last row to select
  194.      */
  195.     public long getEnd() {
  196.       return end;
  197.     }
  198.     /**
  199.      * @return The total row count in this split
  200.      */
  201.     public long getLength() throws IOException {
  202.       return end - start;
  203.     }
  204.     /** {@inheritDoc} */
  205.     public void readFields(DataInput input) throws IOException {
  206.       start = input.readLong();
  207.       end = input.readLong();
  208.     }
  209.     /** {@inheritDoc} */
  210.     public void write(DataOutput output) throws IOException {
  211.       output.writeLong(start);
  212.       output.writeLong(end);
  213.     }
  214.   }
  215.   private String conditions;
  216.   private Connection connection;
  217.   private String tableName;
  218.   private String[] fieldNames;
  219.   private DBConfiguration dbConf;
  220.   /** {@inheritDoc} */
  221.   public void configure(JobConf job) {
  222.     dbConf = new DBConfiguration(job);
  223.     try {
  224.       this.connection = dbConf.getConnection();
  225.       this.connection.setAutoCommit(false);
  226.       connection.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
  227.     }
  228.     catch (Exception ex) {
  229.       throw new RuntimeException(ex);
  230.     }
  231.     tableName = dbConf.getInputTableName();
  232.     fieldNames = dbConf.getInputFieldNames();
  233.     conditions = dbConf.getInputConditions();
  234.   }
  235.   /** {@inheritDoc} */
  236.   @SuppressWarnings("unchecked")
  237.   public RecordReader<LongWritable, T> getRecordReader(InputSplit split,
  238.       JobConf job, Reporter reporter) throws IOException {
  239.     Class inputClass = dbConf.getInputClass();
  240.     try {
  241.       return new DBRecordReader((DBInputSplit) split, inputClass, job);
  242.     }
  243.     catch (SQLException ex) {
  244.       throw new IOException(ex.getMessage());
  245.     }
  246.   }
  247.   /** {@inheritDoc} */
  248.   public InputSplit[] getSplits(JobConf job, int chunks) throws IOException {
  249.     try {
  250.       Statement statement = connection.createStatement();
  251.       ResultSet results = statement.executeQuery(getCountQuery());
  252.       results.next();
  253.       long count = results.getLong(1);
  254.       long chunkSize = (count / chunks);
  255.       results.close();
  256.       statement.close();
  257.       InputSplit[] splits = new InputSplit[chunks];
  258.       // Split the rows into n-number of chunks and adjust the last chunk
  259.       // accordingly
  260.       for (int i = 0; i < chunks; i++) {
  261.         DBInputSplit split;
  262.         if ((i + 1) == chunks)
  263.           split = new DBInputSplit(i * chunkSize, count);
  264.         else
  265.           split = new DBInputSplit(i * chunkSize, (i * chunkSize)
  266.               + chunkSize);
  267.         splits[i] = split;
  268.       }
  269.       return splits;
  270.     } catch (SQLException e) {
  271.       throw new IOException(e.getMessage());
  272.     }
  273.   }
  274.   /** Returns the query for getting the total number of rows, 
  275.    * subclasses can override this for custom behaviour.*/
  276.   protected String getCountQuery() {
  277.     
  278.     if(dbConf.getInputCountQuery() != null) {
  279.       return dbConf.getInputCountQuery();
  280.     }
  281.     
  282.     StringBuilder query = new StringBuilder();
  283.     query.append("SELECT COUNT(*) FROM " + tableName);
  284.     if (conditions != null && conditions.length() > 0)
  285.       query.append(" WHERE " + conditions);
  286.     return query.toString();
  287.   }
  288.   /**
  289.    * Initializes the map-part of the job with the appropriate input settings.
  290.    * 
  291.    * @param job The job
  292.    * @param inputClass the class object implementing DBWritable, which is the 
  293.    * Java object holding tuple fields.
  294.    * @param tableName The table to read data from
  295.    * @param conditions The condition which to select data with, eg. '(updated >
  296.    * 20070101 AND length > 0)'
  297.    * @param orderBy the fieldNames in the orderBy clause.
  298.    * @param fieldNames The field names in the table
  299.    * @see #setInput(JobConf, Class, String, String)
  300.    */
  301.   public static void setInput(JobConf job, Class<? extends DBWritable> inputClass,
  302.       String tableName,String conditions, String orderBy, String... fieldNames) {
  303.     job.setInputFormat(DBInputFormat.class);
  304.     DBConfiguration dbConf = new DBConfiguration(job);
  305.     dbConf.setInputClass(inputClass);
  306.     dbConf.setInputTableName(tableName);
  307.     dbConf.setInputFieldNames(fieldNames);
  308.     dbConf.setInputConditions(conditions);
  309.     dbConf.setInputOrderBy(orderBy);
  310.   }
  311.   
  312.   /**
  313.    * Initializes the map-part of the job with the appropriate input settings.
  314.    * 
  315.    * @param job The job
  316.    * @param inputClass the class object implementing DBWritable, which is the 
  317.    * Java object holding tuple fields.
  318.    * @param inputQuery the input query to select fields. Example : 
  319.    * "SELECT f1, f2, f3 FROM Mytable ORDER BY f1"
  320.    * @param inputCountQuery the input query that returns the number of records in
  321.    * the table. 
  322.    * Example : "SELECT COUNT(f1) FROM Mytable"
  323.    * @see #setInput(JobConf, Class, String, String, String, String...)
  324.    */
  325.   public static void setInput(JobConf job, Class<? extends DBWritable> inputClass,
  326.       String inputQuery, String inputCountQuery) {
  327.     job.setInputFormat(DBInputFormat.class);
  328.     
  329.     DBConfiguration dbConf = new DBConfiguration(job);
  330.     dbConf.setInputClass(inputClass);
  331.     dbConf.setInputQuery(inputQuery);
  332.     dbConf.setInputCountQuery(inputCountQuery);
  333.     
  334.   }
  335. }