DBCountPageView.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:13k
- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package org.apache.hadoop.examples;
- import java.io.DataInput;
- import java.io.DataOutput;
- import java.io.IOException;
- import java.sql.Connection;
- import java.sql.DriverManager;
- import java.sql.PreparedStatement;
- import java.sql.ResultSet;
- import java.sql.SQLException;
- import java.sql.Statement;
- import java.util.Iterator;
- import java.util.Random;
- import org.apache.commons.logging.Log;
- import org.apache.commons.logging.LogFactory;
- import org.apache.hadoop.conf.Configured;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.NullWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.io.Writable;
- import org.apache.hadoop.mapred.JobClient;
- import org.apache.hadoop.mapred.JobConf;
- import org.apache.hadoop.mapred.MapReduceBase;
- import org.apache.hadoop.mapred.Mapper;
- import org.apache.hadoop.mapred.OutputCollector;
- import org.apache.hadoop.mapred.Reducer;
- import org.apache.hadoop.mapred.Reporter;
- import org.apache.hadoop.mapred.lib.LongSumReducer;
- import org.apache.hadoop.mapred.lib.db.DBConfiguration;
- import org.apache.hadoop.mapred.lib.db.DBInputFormat;
- import org.apache.hadoop.mapred.lib.db.DBOutputFormat;
- import org.apache.hadoop.mapred.lib.db.DBWritable;
- import org.apache.hadoop.util.StringUtils;
- import org.apache.hadoop.util.Tool;
- import org.apache.hadoop.util.ToolRunner;
- import org.hsqldb.Server;
- /**
- * This is a demonstrative program, which uses DBInputFormat for reading
- * the input data from a database, and DBOutputFormat for writing the data
- * to the database.
- * <br>
- * The Program first creates the necessary tables, populates the input table
- * and runs the mapred job.
- * <br>
- * The input data is a mini access log, with a <code><url,referrer,time>
- * </code> schema.The output is the number of pageviews of each url in the log,
- * having the schema <code><url,pageview></code>.
- *
- * When called with no arguments the program starts a local HSQLDB server, and
- * uses this database for storing/retrieving the data.
- */
- public class DBCountPageView extends Configured implements Tool {
- private static final Log LOG = LogFactory.getLog(DBCountPageView.class);
-
- private Connection connection;
- private boolean initialized = false;
- private static final String[] AccessFieldNames = {"url", "referrer", "time"};
- private static final String[] PageviewFieldNames = {"url", "pageview"};
-
- private static final String DB_URL = "jdbc:hsqldb:hsql://localhost/URLAccess";
- private static final String DRIVER_CLASS = "org.hsqldb.jdbcDriver";
-
- private Server server;
-
- private void startHsqldbServer() {
- server = new Server();
- server.setDatabasePath(0,
- System.getProperty("test.build.data",".") + "/URLAccess");
- server.setDatabaseName(0, "URLAccess");
- server.start();
- }
-
- private void createConnection(String driverClassName
- , String url) throws Exception {
-
- Class.forName(driverClassName);
- connection = DriverManager.getConnection(url);
- connection.setAutoCommit(false);
- }
- private void shutdown() {
- try {
- connection.commit();
- connection.close();
- }catch (Throwable ex) {
- LOG.warn("Exception occurred while closing connection :"
- + StringUtils.stringifyException(ex));
- } finally {
- try {
- if(server != null) {
- server.shutdown();
- }
- }catch (Throwable ex) {
- LOG.warn("Exception occurred while shutting down HSQLDB :"
- + StringUtils.stringifyException(ex));
- }
- }
- }
- private void initialize(String driverClassName, String url)
- throws Exception {
- if(!this.initialized) {
- if(driverClassName.equals(DRIVER_CLASS)) {
- startHsqldbServer();
- }
- createConnection(driverClassName, url);
- dropTables();
- createTables();
- populateAccess();
- this.initialized = true;
- }
- }
-
- private void dropTables() {
- String dropAccess = "DROP TABLE Access";
- String dropPageview = "DROP TABLE Pageview";
-
- try {
- Statement st = connection.createStatement();
- st.executeUpdate(dropAccess);
- st.executeUpdate(dropPageview);
- connection.commit();
- st.close();
- }catch (SQLException ex) {
- //ignore
- }
- }
-
- private void createTables() throws SQLException {
- String createAccess =
- "CREATE TABLE " +
- "Access(url VARCHAR(100) NOT NULL," +
- " referrer VARCHAR(100)," +
- " time BIGINT NOT NULL, " +
- " PRIMARY KEY (url, time))";
- String createPageview =
- "CREATE TABLE " +
- "Pageview(url VARCHAR(100) NOT NULL," +
- " pageview BIGINT NOT NULL, " +
- " PRIMARY KEY (url))";
-
- Statement st = connection.createStatement();
- try {
- st.executeUpdate(createAccess);
- st.executeUpdate(createPageview);
- connection.commit();
- } finally {
- st.close();
- }
- }
- /**
- * Populates the Access table with generated records.
- */
- private void populateAccess() throws SQLException {
- PreparedStatement statement = null ;
- try {
- statement = connection.prepareStatement(
- "INSERT INTO Access(url, referrer, time)" +
- " VALUES (?, ?, ?)");
- Random random = new Random();
- int time = random.nextInt(50) + 50;
- final int PROBABILITY_PRECISION = 100; // 1 / 100
- final int NEW_PAGE_PROBABILITY = 15; // 15 / 100
- //Pages in the site :
- String[] pages = {"/a", "/b", "/c", "/d", "/e", "/f", "/g", "/h", "/i", "/j"};
- //linkMatrix[i] is the array of pages(indexes) that page_i links to.
- int[][] linkMatrix = {{1,5,7}, {0,7,4,6,}, {0,1,7,8}, {0,2,4,6,7,9}, {0,1},
- {0,3,5,9}, {0}, {0,1,3}, {0,2,6}, {0,2,6}};
- //a mini model of user browsing a la pagerank
- int currentPage = random.nextInt(pages.length);
- String referrer = null;
- for(int i=0; i<time; i++) {
- statement.setString(1, pages[currentPage]);
- statement.setString(2, referrer);
- statement.setLong(3, i);
- statement.execute();
- int action = random.nextInt(PROBABILITY_PRECISION);
- //go to a new page with probability NEW_PAGE_PROBABILITY / PROBABILITY_PRECISION
- if(action < NEW_PAGE_PROBABILITY) {
- currentPage = random.nextInt(pages.length); // a random page
- referrer = null;
- }
- else {
- referrer = pages[currentPage];
- action = random.nextInt(linkMatrix[currentPage].length);
- currentPage = linkMatrix[currentPage][action];
- }
- }
-
- connection.commit();
-
- }catch (SQLException ex) {
- connection.rollback();
- throw ex;
- } finally {
- if(statement != null) {
- statement.close();
- }
- }
- }
-
- /**Verifies the results are correct */
- private boolean verify() throws SQLException {
- //check total num pageview
- String countAccessQuery = "SELECT COUNT(*) FROM Access";
- String sumPageviewQuery = "SELECT SUM(pageview) FROM Pageview";
- Statement st = null;
- ResultSet rs = null;
- try {
- st = connection.createStatement();
- rs = st.executeQuery(countAccessQuery);
- rs.next();
- long totalPageview = rs.getLong(1);
- rs = st.executeQuery(sumPageviewQuery);
- rs.next();
- long sumPageview = rs.getLong(1);
- LOG.info("totalPageview=" + totalPageview);
- LOG.info("sumPageview=" + sumPageview);
- return totalPageview == sumPageview && totalPageview != 0;
- }finally {
- if(st != null)
- st.close();
- if(rs != null)
- rs.close();
- }
- }
-
- /** Holds a <url, referrer, time > tuple */
- static class AccessRecord implements Writable, DBWritable {
- String url;
- String referrer;
- long time;
-
- @Override
- public void readFields(DataInput in) throws IOException {
- this.url = Text.readString(in);
- this.referrer = Text.readString(in);
- this.time = in.readLong();
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- Text.writeString(out, url);
- Text.writeString(out, referrer);
- out.writeLong(time);
- }
-
- @Override
- public void readFields(ResultSet resultSet) throws SQLException {
- this.url = resultSet.getString(1);
- this.referrer = resultSet.getString(2);
- this.time = resultSet.getLong(3);
- }
- @Override
- public void write(PreparedStatement statement) throws SQLException {
- statement.setString(1, url);
- statement.setString(2, referrer);
- statement.setLong(3, time);
- }
- }
- /** Holds a <url, pageview > tuple */
- static class PageviewRecord implements Writable, DBWritable {
- String url;
- long pageview;
-
- public PageviewRecord(String url, long pageview) {
- this.url = url;
- this.pageview = pageview;
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- this.url = Text.readString(in);
- this.pageview = in.readLong();
- }
- @Override
- public void write(DataOutput out) throws IOException {
- Text.writeString(out, url);
- out.writeLong(pageview);
- }
- @Override
- public void readFields(ResultSet resultSet) throws SQLException {
- this.url = resultSet.getString(1);
- this.pageview = resultSet.getLong(2);
- }
- @Override
- public void write(PreparedStatement statement) throws SQLException {
- statement.setString(1, url);
- statement.setLong(2, pageview);
- }
- @Override
- public String toString() {
- return url + " " + pageview;
- }
- }
-
- /**
- * Mapper extracts URLs from the AccessRecord (tuples from db),
- * and emits a <url,1> pair for each access record.
- */
- static class PageviewMapper extends MapReduceBase
- implements Mapper<LongWritable, AccessRecord, Text, LongWritable> {
-
- LongWritable ONE = new LongWritable(1L);
- @Override
- public void map(LongWritable key, AccessRecord value,
- OutputCollector<Text, LongWritable> output, Reporter reporter)
- throws IOException {
-
- Text oKey = new Text(value.url);
- output.collect(oKey, ONE);
- }
- }
-
- /**
- * Reducer sums up the pageviews and emits a PageviewRecord,
- * which will correspond to one tuple in the db.
- */
- static class PageviewReducer extends MapReduceBase
- implements Reducer<Text, LongWritable, PageviewRecord, NullWritable> {
-
- NullWritable n = NullWritable.get();
- @Override
- public void reduce(Text key, Iterator<LongWritable> values,
- OutputCollector<PageviewRecord, NullWritable> output, Reporter reporter)
- throws IOException {
-
- long sum = 0L;
- while(values.hasNext()) {
- sum += values.next().get();
- }
- output.collect(new PageviewRecord(key.toString(), sum), n);
- }
- }
-
- @Override
- //Usage DBCountPageView [driverClass dburl]
- public int run(String[] args) throws Exception {
-
- String driverClassName = DRIVER_CLASS;
- String url = DB_URL;
-
- if(args.length > 1) {
- driverClassName = args[0];
- url = args[1];
- }
-
- initialize(driverClassName, url);
- JobConf job = new JobConf(getConf(), DBCountPageView.class);
-
- job.setJobName("Count Pageviews of URLs");
- job.setMapperClass(PageviewMapper.class);
- job.setCombinerClass(LongSumReducer.class);
- job.setReducerClass(PageviewReducer.class);
- DBConfiguration.configureDB(job, driverClassName, url);
-
- DBInputFormat.setInput(job, AccessRecord.class, "Access"
- , null, "url", AccessFieldNames);
- DBOutputFormat.setOutput(job, "Pageview", PageviewFieldNames);
-
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(LongWritable.class);
- job.setOutputKeyClass(PageviewRecord.class);
- job.setOutputValueClass(NullWritable.class);
- try {
- JobClient.runJob(job);
-
- boolean correct = verify();
- if(!correct) {
- throw new RuntimeException("Evaluation was not correct!");
- }
- } finally {
- shutdown();
- }
- return 0;
- }
- public static void main(String[] args) throws Exception {
- int ret = ToolRunner.run(new DBCountPageView(), args);
- System.exit(ret);
- }
- }