TeraInputFormat.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:7k
- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package org.apache.hadoop.examples.terasort;
- import java.io.IOException;
- import java.util.ArrayList;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.NullWritable;
- import org.apache.hadoop.io.SequenceFile;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapred.FileInputFormat;
- import org.apache.hadoop.mapred.FileSplit;
- import org.apache.hadoop.mapred.InputSplit;
- import org.apache.hadoop.mapred.JobConf;
- import org.apache.hadoop.mapred.LineRecordReader;
- import org.apache.hadoop.mapred.RecordReader;
- import org.apache.hadoop.mapred.Reporter;
- import org.apache.hadoop.util.IndexedSortable;
- import org.apache.hadoop.util.QuickSort;
- /**
- * An input format that reads the first 10 characters of each line as the key
- * and the rest of the line as the value. Both key and value are represented
- * as Text.
- */
- public class TeraInputFormat extends FileInputFormat<Text,Text> {
- static final String PARTITION_FILENAME = "_partition.lst";
- static final String SAMPLE_SIZE = "terasort.partitions.sample";
- private static JobConf lastConf = null;
- private static InputSplit[] lastResult = null;
- static class TextSampler implements IndexedSortable {
- private ArrayList<Text> records = new ArrayList<Text>();
- public int compare(int i, int j) {
- Text left = records.get(i);
- Text right = records.get(j);
- return left.compareTo(right);
- }
- public void swap(int i, int j) {
- Text left = records.get(i);
- Text right = records.get(j);
- records.set(j, left);
- records.set(i, right);
- }
- public void addKey(Text key) {
- records.add(new Text(key));
- }
- /**
- * Find the split points for a given sample. The sample keys are sorted
- * and down sampled to find even split points for the partitions. The
- * returned keys should be the start of their respective partitions.
- * @param numPartitions the desired number of partitions
- * @return an array of size numPartitions - 1 that holds the split points
- */
- Text[] createPartitions(int numPartitions) {
- int numRecords = records.size();
- System.out.println("Making " + numPartitions + " from " + numRecords +
- " records");
- if (numPartitions > numRecords) {
- throw new IllegalArgumentException
- ("Requested more partitions than input keys (" + numPartitions +
- " > " + numRecords + ")");
- }
- new QuickSort().sort(this, 0, records.size());
- float stepSize = numRecords / (float) numPartitions;
- System.out.println("Step size is " + stepSize);
- Text[] result = new Text[numPartitions-1];
- for(int i=1; i < numPartitions; ++i) {
- result[i-1] = records.get(Math.round(stepSize * i));
- }
- return result;
- }
- }
-
- /**
- * Use the input splits to take samples of the input and generate sample
- * keys. By default reads 100,000 keys from 10 locations in the input, sorts
- * them and picks N-1 keys to generate N equally sized partitions.
- * @param conf the job to sample
- * @param partFile where to write the output file to
- * @throws IOException if something goes wrong
- */
- public static void writePartitionFile(JobConf conf,
- Path partFile) throws IOException {
- TeraInputFormat inFormat = new TeraInputFormat();
- TextSampler sampler = new TextSampler();
- Text key = new Text();
- Text value = new Text();
- int partitions = conf.getNumReduceTasks();
- long sampleSize = conf.getLong(SAMPLE_SIZE, 100000);
- InputSplit[] splits = inFormat.getSplits(conf, conf.getNumMapTasks());
- int samples = Math.min(10, splits.length);
- long recordsPerSample = sampleSize / samples;
- int sampleStep = splits.length / samples;
- long records = 0;
- // take N samples from different parts of the input
- for(int i=0; i < samples; ++i) {
- RecordReader<Text,Text> reader =
- inFormat.getRecordReader(splits[sampleStep * i], conf, null);
- while (reader.next(key, value)) {
- sampler.addKey(key);
- records += 1;
- if ((i+1) * recordsPerSample <= records) {
- break;
- }
- }
- }
- FileSystem outFs = partFile.getFileSystem(conf);
- if (outFs.exists(partFile)) {
- outFs.delete(partFile, false);
- }
- SequenceFile.Writer writer =
- SequenceFile.createWriter(outFs, conf, partFile, Text.class,
- NullWritable.class);
- NullWritable nullValue = NullWritable.get();
- for(Text split : sampler.createPartitions(partitions)) {
- writer.append(split, nullValue);
- }
- writer.close();
- }
- static class TeraRecordReader implements RecordReader<Text,Text> {
- private LineRecordReader in;
- private LongWritable junk = new LongWritable();
- private Text line = new Text();
- private static int KEY_LENGTH = 10;
- public TeraRecordReader(Configuration job,
- FileSplit split) throws IOException {
- in = new LineRecordReader(job, split);
- }
- public void close() throws IOException {
- in.close();
- }
- public Text createKey() {
- return new Text();
- }
- public Text createValue() {
- return new Text();
- }
- public long getPos() throws IOException {
- return in.getPos();
- }
- public float getProgress() throws IOException {
- return in.getProgress();
- }
- public boolean next(Text key, Text value) throws IOException {
- if (in.next(junk, line)) {
- if (line.getLength() < KEY_LENGTH) {
- key.set(line);
- value.clear();
- } else {
- byte[] bytes = line.getBytes();
- key.set(bytes, 0, KEY_LENGTH);
- value.set(bytes, KEY_LENGTH, line.getLength() - KEY_LENGTH);
- }
- return true;
- } else {
- return false;
- }
- }
- }
- @Override
- public RecordReader<Text, Text>
- getRecordReader(InputSplit split,
- JobConf job,
- Reporter reporter) throws IOException {
- return new TeraRecordReader(job, (FileSplit) split);
- }
- @Override
- public InputSplit[] getSplits(JobConf conf, int splits) throws IOException {
- if (conf == lastConf) {
- return lastResult;
- }
- lastConf = conf;
- lastResult = super.getSplits(conf, splits);
- return lastResult;
- }
- }