TeraInputFormat.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:7k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.examples.terasort;
  19. import java.io.IOException;
  20. import java.util.ArrayList;
  21. import org.apache.hadoop.conf.Configuration;
  22. import org.apache.hadoop.fs.FileSystem;
  23. import org.apache.hadoop.fs.Path;
  24. import org.apache.hadoop.io.LongWritable;
  25. import org.apache.hadoop.io.NullWritable;
  26. import org.apache.hadoop.io.SequenceFile;
  27. import org.apache.hadoop.io.Text;
  28. import org.apache.hadoop.mapred.FileInputFormat;
  29. import org.apache.hadoop.mapred.FileSplit;
  30. import org.apache.hadoop.mapred.InputSplit;
  31. import org.apache.hadoop.mapred.JobConf;
  32. import org.apache.hadoop.mapred.LineRecordReader;
  33. import org.apache.hadoop.mapred.RecordReader;
  34. import org.apache.hadoop.mapred.Reporter;
  35. import org.apache.hadoop.util.IndexedSortable;
  36. import org.apache.hadoop.util.QuickSort;
  37. /**
  38.  * An input format that reads the first 10 characters of each line as the key
  39.  * and the rest of the line as the value. Both key and value are represented
  40.  * as Text.
  41.  */
  42. public class TeraInputFormat extends FileInputFormat<Text,Text> {
  43.   static final String PARTITION_FILENAME = "_partition.lst";
  44.   static final String SAMPLE_SIZE = "terasort.partitions.sample";
  45.   private static JobConf lastConf = null;
  46.   private static InputSplit[] lastResult = null;
  47.   static class TextSampler implements IndexedSortable {
  48.     private ArrayList<Text> records = new ArrayList<Text>();
  49.     public int compare(int i, int j) {
  50.       Text left = records.get(i);
  51.       Text right = records.get(j);
  52.       return left.compareTo(right);
  53.     }
  54.     public void swap(int i, int j) {
  55.       Text left = records.get(i);
  56.       Text right = records.get(j);
  57.       records.set(j, left);
  58.       records.set(i, right);
  59.     }
  60.     public void addKey(Text key) {
  61.       records.add(new Text(key));
  62.     }
  63.     /**
  64.      * Find the split points for a given sample. The sample keys are sorted
  65.      * and down sampled to find even split points for the partitions. The
  66.      * returned keys should be the start of their respective partitions.
  67.      * @param numPartitions the desired number of partitions
  68.      * @return an array of size numPartitions - 1 that holds the split points
  69.      */
  70.     Text[] createPartitions(int numPartitions) {
  71.       int numRecords = records.size();
  72.       System.out.println("Making " + numPartitions + " from " + numRecords + 
  73.                          " records");
  74.       if (numPartitions > numRecords) {
  75.         throw new IllegalArgumentException
  76.           ("Requested more partitions than input keys (" + numPartitions +
  77.            " > " + numRecords + ")");
  78.       }
  79.       new QuickSort().sort(this, 0, records.size());
  80.       float stepSize = numRecords / (float) numPartitions;
  81.       System.out.println("Step size is " + stepSize);
  82.       Text[] result = new Text[numPartitions-1];
  83.       for(int i=1; i < numPartitions; ++i) {
  84.         result[i-1] = records.get(Math.round(stepSize * i));
  85.       }
  86.       return result;
  87.     }
  88.   }
  89.   
  90.   /**
  91.    * Use the input splits to take samples of the input and generate sample
  92.    * keys. By default reads 100,000 keys from 10 locations in the input, sorts
  93.    * them and picks N-1 keys to generate N equally sized partitions.
  94.    * @param conf the job to sample
  95.    * @param partFile where to write the output file to
  96.    * @throws IOException if something goes wrong
  97.    */
  98.   public static void writePartitionFile(JobConf conf, 
  99.                                         Path partFile) throws IOException {
  100.     TeraInputFormat inFormat = new TeraInputFormat();
  101.     TextSampler sampler = new TextSampler();
  102.     Text key = new Text();
  103.     Text value = new Text();
  104.     int partitions = conf.getNumReduceTasks();
  105.     long sampleSize = conf.getLong(SAMPLE_SIZE, 100000);
  106.     InputSplit[] splits = inFormat.getSplits(conf, conf.getNumMapTasks());
  107.     int samples = Math.min(10, splits.length);
  108.     long recordsPerSample = sampleSize / samples;
  109.     int sampleStep = splits.length / samples;
  110.     long records = 0;
  111.     // take N samples from different parts of the input
  112.     for(int i=0; i < samples; ++i) {
  113.       RecordReader<Text,Text> reader = 
  114.         inFormat.getRecordReader(splits[sampleStep * i], conf, null);
  115.       while (reader.next(key, value)) {
  116.         sampler.addKey(key);
  117.         records += 1;
  118.         if ((i+1) * recordsPerSample <= records) {
  119.           break;
  120.         }
  121.       }
  122.     }
  123.     FileSystem outFs = partFile.getFileSystem(conf);
  124.     if (outFs.exists(partFile)) {
  125.       outFs.delete(partFile, false);
  126.     }
  127.     SequenceFile.Writer writer = 
  128.       SequenceFile.createWriter(outFs, conf, partFile, Text.class, 
  129.                                 NullWritable.class);
  130.     NullWritable nullValue = NullWritable.get();
  131.     for(Text split : sampler.createPartitions(partitions)) {
  132.       writer.append(split, nullValue);
  133.     }
  134.     writer.close();
  135.   }
  136.   static class TeraRecordReader implements RecordReader<Text,Text> {
  137.     private LineRecordReader in;
  138.     private LongWritable junk = new LongWritable();
  139.     private Text line = new Text();
  140.     private static int KEY_LENGTH = 10;
  141.     public TeraRecordReader(Configuration job, 
  142.                             FileSplit split) throws IOException {
  143.       in = new LineRecordReader(job, split);
  144.     }
  145.     public void close() throws IOException {
  146.       in.close();
  147.     }
  148.     public Text createKey() {
  149.       return new Text();
  150.     }
  151.     public Text createValue() {
  152.       return new Text();
  153.     }
  154.     public long getPos() throws IOException {
  155.       return in.getPos();
  156.     }
  157.     public float getProgress() throws IOException {
  158.       return in.getProgress();
  159.     }
  160.     public boolean next(Text key, Text value) throws IOException {
  161.       if (in.next(junk, line)) {
  162.         if (line.getLength() < KEY_LENGTH) {
  163.           key.set(line);
  164.           value.clear();
  165.         } else {
  166.           byte[] bytes = line.getBytes();
  167.           key.set(bytes, 0, KEY_LENGTH);
  168.           value.set(bytes, KEY_LENGTH, line.getLength() - KEY_LENGTH);
  169.         }
  170.         return true;
  171.       } else {
  172.         return false;
  173.       }
  174.     }
  175.   }
  176.   @Override
  177.   public RecordReader<Text, Text> 
  178.       getRecordReader(InputSplit split,
  179.                       JobConf job, 
  180.                       Reporter reporter) throws IOException {
  181.     return new TeraRecordReader(job, (FileSplit) split);
  182.   }
  183.   @Override
  184.   public InputSplit[] getSplits(JobConf conf, int splits) throws IOException {
  185.     if (conf == lastConf) {
  186.       return lastResult;
  187.     }
  188.     lastConf = conf;
  189.     lastResult = super.getSplits(conf, splits);
  190.     return lastResult;
  191.   }
  192. }