DBCountPageView.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:13k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.examples;
  19. import java.io.DataInput;
  20. import java.io.DataOutput;
  21. import java.io.IOException;
  22. import java.sql.Connection;
  23. import java.sql.DriverManager;
  24. import java.sql.PreparedStatement;
  25. import java.sql.ResultSet;
  26. import java.sql.SQLException;
  27. import java.sql.Statement;
  28. import java.util.Iterator;
  29. import java.util.Random;
  30. import org.apache.commons.logging.Log;
  31. import org.apache.commons.logging.LogFactory;
  32. import org.apache.hadoop.conf.Configured;
  33. import org.apache.hadoop.io.LongWritable;
  34. import org.apache.hadoop.io.NullWritable;
  35. import org.apache.hadoop.io.Text;
  36. import org.apache.hadoop.io.Writable;
  37. import org.apache.hadoop.mapred.JobClient;
  38. import org.apache.hadoop.mapred.JobConf;
  39. import org.apache.hadoop.mapred.MapReduceBase;
  40. import org.apache.hadoop.mapred.Mapper;
  41. import org.apache.hadoop.mapred.OutputCollector;
  42. import org.apache.hadoop.mapred.Reducer;
  43. import org.apache.hadoop.mapred.Reporter;
  44. import org.apache.hadoop.mapred.lib.LongSumReducer;
  45. import org.apache.hadoop.mapred.lib.db.DBConfiguration;
  46. import org.apache.hadoop.mapred.lib.db.DBInputFormat;
  47. import org.apache.hadoop.mapred.lib.db.DBOutputFormat;
  48. import org.apache.hadoop.mapred.lib.db.DBWritable;
  49. import org.apache.hadoop.util.StringUtils;
  50. import org.apache.hadoop.util.Tool;
  51. import org.apache.hadoop.util.ToolRunner;
  52. import org.hsqldb.Server;
  53. /**
  54.  * This is a demonstrative program, which uses DBInputFormat for reading
  55.  * the input data from a database, and DBOutputFormat for writing the data 
  56.  * to the database. 
  57.  * <br>
  58.  * The Program first creates the necessary tables, populates the input table 
  59.  * and runs the mapred job. 
  60.  * <br> 
  61.  * The input data is a mini access log, with a <code>&lt;url,referrer,time&gt;
  62.  * </code> schema.The output is the number of pageviews of each url in the log, 
  63.  * having the schema <code>&lt;url,pageview&gt;</code>.  
  64.  * 
  65.  * When called with no arguments the program starts a local HSQLDB server, and 
  66.  * uses this database for storing/retrieving the data. 
  67.  */
  68. public class DBCountPageView extends Configured implements Tool {
  69.   private static final Log LOG = LogFactory.getLog(DBCountPageView.class);
  70.   
  71.   private Connection connection;
  72.   private boolean initialized = false;
  73.   private static final String[] AccessFieldNames = {"url", "referrer", "time"};
  74.   private static final String[] PageviewFieldNames = {"url", "pageview"};
  75.   
  76.   private static final String DB_URL = "jdbc:hsqldb:hsql://localhost/URLAccess";
  77.   private static final String DRIVER_CLASS = "org.hsqldb.jdbcDriver";
  78.   
  79.   private Server server;
  80.   
  81.   private void startHsqldbServer() {
  82.     server = new Server();
  83.     server.setDatabasePath(0, 
  84.         System.getProperty("test.build.data",".") + "/URLAccess");
  85.     server.setDatabaseName(0, "URLAccess");
  86.     server.start();
  87.   }
  88.   
  89.   private void createConnection(String driverClassName
  90.       , String url) throws Exception {
  91.     
  92.     Class.forName(driverClassName);
  93.     connection = DriverManager.getConnection(url);
  94.     connection.setAutoCommit(false);
  95.   }
  96.   private void shutdown() {
  97.     try {
  98.       connection.commit();
  99.       connection.close();
  100.     }catch (Throwable ex) {
  101.       LOG.warn("Exception occurred while closing connection :"
  102.           + StringUtils.stringifyException(ex));
  103.     } finally {
  104.       try {
  105.         if(server != null) {
  106.           server.shutdown();
  107.         }
  108.       }catch (Throwable ex) {
  109.         LOG.warn("Exception occurred while shutting down HSQLDB :"
  110.             + StringUtils.stringifyException(ex));
  111.       }
  112.     }
  113.   }
  114.   private void initialize(String driverClassName, String url) 
  115.     throws Exception {
  116.     if(!this.initialized) {
  117.       if(driverClassName.equals(DRIVER_CLASS)) {
  118.         startHsqldbServer();
  119.       }
  120.       createConnection(driverClassName, url);
  121.       dropTables();
  122.       createTables();
  123.       populateAccess();
  124.       this.initialized = true;  
  125.     }
  126.   }
  127.   
  128.   private void dropTables() {
  129.     String dropAccess = "DROP TABLE Access";
  130.     String dropPageview = "DROP TABLE Pageview";
  131.     
  132.     try {
  133.       Statement st = connection.createStatement();
  134.       st.executeUpdate(dropAccess);
  135.       st.executeUpdate(dropPageview);
  136.       connection.commit();
  137.       st.close();
  138.     }catch (SQLException ex) {
  139.       //ignore
  140.     }
  141.   }
  142.   
  143.   private void createTables() throws SQLException {
  144.     String createAccess = 
  145.       "CREATE TABLE " +
  146.       "Access(url      VARCHAR(100) NOT NULL," +
  147.             " referrer VARCHAR(100)," +
  148.             " time     BIGINT NOT NULL, " +
  149.             " PRIMARY KEY (url, time))";
  150.     String createPageview = 
  151.       "CREATE TABLE " +
  152.       "Pageview(url      VARCHAR(100) NOT NULL," +
  153.               " pageview     BIGINT NOT NULL, " +
  154.                " PRIMARY KEY (url))";
  155.     
  156.     Statement st = connection.createStatement();
  157.     try {
  158.       st.executeUpdate(createAccess);
  159.       st.executeUpdate(createPageview);
  160.       connection.commit();
  161.     } finally {
  162.       st.close();
  163.     }
  164.   }
  165.   /**
  166.    * Populates the Access table with generated records.
  167.    */
  168.   private void populateAccess() throws SQLException {
  169.     PreparedStatement statement = null ;
  170.     try {
  171.       statement = connection.prepareStatement(
  172.           "INSERT INTO Access(url, referrer, time)" +
  173.           " VALUES (?, ?, ?)");
  174.       Random random = new Random();
  175.       int time = random.nextInt(50) + 50;
  176.       final int PROBABILITY_PRECISION = 100; //  1 / 100 
  177.       final int NEW_PAGE_PROBABILITY  = 15;  //  15 / 100
  178.       //Pages in the site :
  179.       String[] pages = {"/a", "/b", "/c", "/d", "/e", "/f", "/g", "/h", "/i", "/j"};
  180.       //linkMatrix[i] is the array of pages(indexes) that page_i links to.  
  181.       int[][] linkMatrix = {{1,5,7}, {0,7,4,6,}, {0,1,7,8}, {0,2,4,6,7,9}, {0,1},
  182.           {0,3,5,9}, {0}, {0,1,3}, {0,2,6}, {0,2,6}};
  183.       //a mini model of user browsing a la pagerank
  184.       int currentPage = random.nextInt(pages.length); 
  185.       String referrer = null;
  186.       for(int i=0; i<time; i++) {
  187.         statement.setString(1, pages[currentPage]);
  188.         statement.setString(2, referrer);
  189.         statement.setLong(3, i);
  190.         statement.execute();
  191.         int action = random.nextInt(PROBABILITY_PRECISION);
  192.         //go to a new page with probability NEW_PAGE_PROBABILITY / PROBABILITY_PRECISION
  193.         if(action < NEW_PAGE_PROBABILITY) { 
  194.           currentPage = random.nextInt(pages.length); // a random page
  195.           referrer = null;
  196.         }
  197.         else {
  198.           referrer = pages[currentPage];
  199.           action = random.nextInt(linkMatrix[currentPage].length);
  200.           currentPage = linkMatrix[currentPage][action];
  201.         }
  202.       }
  203.       
  204.       connection.commit();
  205.       
  206.     }catch (SQLException ex) {
  207.       connection.rollback();
  208.       throw ex;
  209.     } finally {
  210.       if(statement != null) {
  211.         statement.close();
  212.       }
  213.     }
  214.   }
  215.   
  216.   /**Verifies the results are correct */
  217.   private boolean verify() throws SQLException {
  218.     //check total num pageview
  219.     String countAccessQuery = "SELECT COUNT(*) FROM Access";
  220.     String sumPageviewQuery = "SELECT SUM(pageview) FROM Pageview";
  221.     Statement st = null;
  222.     ResultSet rs = null;
  223.     try {
  224.       st = connection.createStatement();
  225.       rs = st.executeQuery(countAccessQuery);
  226.       rs.next();
  227.       long totalPageview = rs.getLong(1);
  228.       rs = st.executeQuery(sumPageviewQuery);
  229.       rs.next();
  230.       long sumPageview = rs.getLong(1);
  231.       LOG.info("totalPageview=" + totalPageview);
  232.       LOG.info("sumPageview=" + sumPageview);
  233.       return totalPageview == sumPageview && totalPageview != 0;
  234.     }finally {
  235.       if(st != null)
  236.         st.close();
  237.       if(rs != null)
  238.         rs.close();
  239.     }
  240.   }
  241.   
  242.   /** Holds a &lt;url, referrer, time &gt; tuple */
  243.   static class AccessRecord implements Writable, DBWritable {
  244.     String url;
  245.     String referrer;
  246.     long time;
  247.     
  248.     @Override
  249.     public void readFields(DataInput in) throws IOException {
  250.       this.url = Text.readString(in);
  251.       this.referrer = Text.readString(in);
  252.       this.time = in.readLong();
  253.     }
  254.     
  255.     @Override
  256.     public void write(DataOutput out) throws IOException {
  257.       Text.writeString(out, url);
  258.       Text.writeString(out, referrer);
  259.       out.writeLong(time);
  260.     }
  261.     
  262.     @Override
  263.     public void readFields(ResultSet resultSet) throws SQLException {
  264.       this.url = resultSet.getString(1);
  265.       this.referrer = resultSet.getString(2);
  266.       this.time = resultSet.getLong(3);
  267.     }
  268.     @Override
  269.     public void write(PreparedStatement statement) throws SQLException {
  270.       statement.setString(1, url);
  271.       statement.setString(2, referrer);
  272.       statement.setLong(3, time);
  273.     }
  274.   }
  275.   /** Holds a &lt;url, pageview &gt; tuple */
  276.   static class PageviewRecord implements Writable, DBWritable {
  277.     String url;
  278.     long pageview;
  279.    
  280.     public PageviewRecord(String url, long pageview) {
  281.       this.url = url;
  282.       this.pageview = pageview;
  283.     }
  284.     
  285.     @Override
  286.     public void readFields(DataInput in) throws IOException {
  287.       this.url = Text.readString(in);
  288.       this.pageview = in.readLong();
  289.     }
  290.     @Override
  291.     public void write(DataOutput out) throws IOException {
  292.       Text.writeString(out, url);
  293.       out.writeLong(pageview);
  294.     }
  295.     @Override
  296.     public void readFields(ResultSet resultSet) throws SQLException {
  297.       this.url = resultSet.getString(1);
  298.       this.pageview = resultSet.getLong(2);
  299.     }
  300.     @Override
  301.     public void write(PreparedStatement statement) throws SQLException {
  302.       statement.setString(1, url);
  303.       statement.setLong(2, pageview);
  304.     }
  305.     @Override
  306.     public String toString() {
  307.       return url + " " + pageview;
  308.     }
  309.   }
  310.   
  311.   /**
  312.    * Mapper extracts URLs from the AccessRecord (tuples from db), 
  313.    * and emits a &lt;url,1&gt; pair for each access record. 
  314.    */
  315.   static class PageviewMapper extends MapReduceBase 
  316.     implements Mapper<LongWritable, AccessRecord, Text, LongWritable> {
  317.     
  318.     LongWritable ONE = new LongWritable(1L);
  319.     @Override
  320.     public void map(LongWritable key, AccessRecord value,
  321.         OutputCollector<Text, LongWritable> output, Reporter reporter)
  322.         throws IOException {
  323.       
  324.       Text oKey = new Text(value.url);
  325.       output.collect(oKey, ONE);
  326.     }
  327.   }
  328.   
  329.   /**
  330.    * Reducer sums up the pageviews and emits a PageviewRecord, 
  331.    * which will correspond to one tuple in the db.
  332.    */
  333.   static class PageviewReducer extends MapReduceBase 
  334.     implements Reducer<Text, LongWritable, PageviewRecord, NullWritable> {
  335.     
  336.     NullWritable n = NullWritable.get();
  337.     @Override
  338.     public void reduce(Text key, Iterator<LongWritable> values,
  339.         OutputCollector<PageviewRecord, NullWritable> output, Reporter reporter)
  340.         throws IOException {
  341.       
  342.       long sum = 0L;
  343.       while(values.hasNext()) {
  344.         sum += values.next().get();
  345.       }
  346.       output.collect(new PageviewRecord(key.toString(), sum), n);
  347.     }
  348.   }
  349.   
  350.   @Override
  351.   //Usage DBCountPageView [driverClass dburl]
  352.   public int run(String[] args) throws Exception {
  353.     
  354.     String driverClassName = DRIVER_CLASS;
  355.     String url = DB_URL;
  356.     
  357.     if(args.length > 1) {
  358.       driverClassName = args[0];
  359.       url = args[1];
  360.     }
  361.     
  362.     initialize(driverClassName, url);
  363.     JobConf job = new JobConf(getConf(), DBCountPageView.class);
  364.         
  365.     job.setJobName("Count Pageviews of URLs");
  366.     job.setMapperClass(PageviewMapper.class);
  367.     job.setCombinerClass(LongSumReducer.class);
  368.     job.setReducerClass(PageviewReducer.class);
  369.     DBConfiguration.configureDB(job, driverClassName, url);
  370.     
  371.     DBInputFormat.setInput(job, AccessRecord.class, "Access"
  372.         , null, "url", AccessFieldNames);
  373.     DBOutputFormat.setOutput(job, "Pageview", PageviewFieldNames);
  374.     
  375.     job.setMapOutputKeyClass(Text.class);
  376.     job.setMapOutputValueClass(LongWritable.class);
  377.     job.setOutputKeyClass(PageviewRecord.class);
  378.     job.setOutputValueClass(NullWritable.class);
  379.     try {
  380.       JobClient.runJob(job);
  381.       
  382.       boolean correct = verify();
  383.       if(!correct) {
  384.         throw new RuntimeException("Evaluation was not correct!");
  385.       }
  386.     } finally {
  387.       shutdown();    
  388.     }
  389.     return 0;
  390.   }
  391.   public static void main(String[] args) throws Exception {
  392.     int ret = ToolRunner.run(new DBCountPageView(), args);
  393.     System.exit(ret);
  394.   }
  395. }