DBOutputFormat.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:6k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred.lib.db;
  19. import java.io.IOException;
  20. import java.sql.Connection;
  21. import java.sql.PreparedStatement;
  22. import java.sql.SQLException;
  23. import org.apache.commons.logging.Log;
  24. import org.apache.commons.logging.LogFactory;
  25. import org.apache.hadoop.fs.FileSystem;
  26. import org.apache.hadoop.mapred.JobConf;
  27. import org.apache.hadoop.mapred.OutputFormat;
  28. import org.apache.hadoop.mapred.RecordWriter;
  29. import org.apache.hadoop.mapred.Reporter;
  30. import org.apache.hadoop.util.Progressable;
  31. import org.apache.hadoop.util.StringUtils;
  32. /**
  33.  * A OutputFormat that sends the reduce output to a SQL table.
  34.  * <p> 
  35.  * {@link DBOutputFormat} accepts &lt;key,value&gt; pairs, where 
  36.  * key has a type extending DBWritable. Returned {@link RecordWriter} 
  37.  * writes <b>only the key</b> to the database with a batch SQL query.  
  38.  * 
  39.  */
  40. public class DBOutputFormat<K  extends DBWritable, V> 
  41. implements OutputFormat<K,V> {
  42.   private static final Log LOG = LogFactory.getLog(DBOutputFormat.class);
  43.   /**
  44.    * A RecordWriter that writes the reduce output to a SQL table
  45.    */
  46.   protected class DBRecordWriter 
  47.   implements RecordWriter<K, V> {
  48.     private Connection connection;
  49.     private PreparedStatement statement;
  50.     protected DBRecordWriter(Connection connection
  51.         , PreparedStatement statement) throws SQLException {
  52.       this.connection = connection;
  53.       this.statement = statement;
  54.       this.connection.setAutoCommit(false);
  55.     }
  56.     /** {@inheritDoc} */
  57.     public void close(Reporter reporter) throws IOException {
  58.       try {
  59.         statement.executeBatch();
  60.         connection.commit();
  61.       } catch (SQLException e) {
  62.         try {
  63.           connection.rollback();
  64.         }
  65.         catch (SQLException ex) {
  66.           LOG.warn(StringUtils.stringifyException(ex));
  67.         }
  68.         throw new IOException(e.getMessage());
  69.       } finally {
  70.         try {
  71.           statement.close();
  72.           connection.close();
  73.         }
  74.         catch (SQLException ex) {
  75.           throw new IOException(ex.getMessage());
  76.         }
  77.       }
  78.     }
  79.     /** {@inheritDoc} */
  80.     public void write(K key, V value) throws IOException {
  81.       try {
  82.         key.write(statement);
  83.         statement.addBatch();
  84.       } catch (SQLException e) {
  85.         e.printStackTrace();
  86.       }
  87.     }
  88.   }
  89.   /**
  90.    * Constructs the query used as the prepared statement to insert data.
  91.    * 
  92.    * @param table
  93.    *          the table to insert into
  94.    * @param fieldNames
  95.    *          the fields to insert into. If field names are unknown, supply an
  96.    *          array of nulls.
  97.    */
  98.   protected String constructQuery(String table, String[] fieldNames) {
  99.     if(fieldNames == null) {
  100.       throw new IllegalArgumentException("Field names may not be null");
  101.     }
  102.     StringBuilder query = new StringBuilder();
  103.     query.append("INSERT INTO ").append(table);
  104.     if (fieldNames.length > 0 && fieldNames[0] != null) {
  105.       query.append(" (");
  106.       for (int i = 0; i < fieldNames.length; i++) {
  107.         query.append(fieldNames[i]);
  108.         if (i != fieldNames.length - 1) {
  109.           query.append(",");
  110.         }
  111.       }
  112.       query.append(")");
  113.     }
  114.     query.append(" VALUES (");
  115.     for (int i = 0; i < fieldNames.length; i++) {
  116.       query.append("?");
  117.       if(i != fieldNames.length - 1) {
  118.         query.append(",");
  119.       }
  120.     }
  121.     query.append(");");
  122.     return query.toString();
  123.   }
  124.   /** {@inheritDoc} */
  125.   public void checkOutputSpecs(FileSystem filesystem, JobConf job)
  126.   throws IOException {
  127.   }
  128.   /** {@inheritDoc} */
  129.   public RecordWriter<K, V> getRecordWriter(FileSystem filesystem,
  130.       JobConf job, String name, Progressable progress) throws IOException {
  131.     DBConfiguration dbConf = new DBConfiguration(job);
  132.     String tableName = dbConf.getOutputTableName();
  133.     String[] fieldNames = dbConf.getOutputFieldNames();
  134.     
  135.     try {
  136.       Connection connection = dbConf.getConnection();
  137.       PreparedStatement statement = null;
  138.   
  139.       statement = connection.prepareStatement(constructQuery(tableName, fieldNames));
  140.       return new DBRecordWriter(connection, statement);
  141.     }
  142.     catch (Exception ex) {
  143.       throw new IOException(ex.getMessage());
  144.     }
  145.   }
  146.   /**
  147.    * Initializes the reduce-part of the job with the appropriate output settings
  148.    * 
  149.    * @param job
  150.    *          The job
  151.    * @param tableName
  152.    *          The table to insert data into
  153.    * @param fieldNames
  154.    *          The field names in the table. If unknown, supply the appropriate
  155.    *          number of nulls.
  156.    */
  157.   public static void setOutput(JobConf job, String tableName, String... fieldNames) {
  158.     job.setOutputFormat(DBOutputFormat.class);
  159.     job.setReduceSpeculativeExecution(false);
  160.     DBConfiguration dbConf = new DBConfiguration(job);
  161.     
  162.     dbConf.setOutputTableName(tableName);
  163.     dbConf.setOutputFieldNames(fieldNames);
  164.   }
  165. }