SpillRecord.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:5k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.IOException;
  20. import java.nio.ByteBuffer;
  21. import java.nio.LongBuffer;
  22. import java.util.zip.CRC32;
  23. import java.util.zip.CheckedInputStream;
  24. import java.util.zip.CheckedOutputStream;
  25. import java.util.zip.Checksum;
  26. import org.apache.hadoop.fs.ChecksumException;
  27. import org.apache.hadoop.fs.FSDataInputStream;
  28. import org.apache.hadoop.fs.FSDataOutputStream;
  29. import org.apache.hadoop.fs.FileSystem;
  30. import org.apache.hadoop.fs.Path;
  31. import org.apache.hadoop.io.IOUtils;
  32. import static org.apache.hadoop.mapred.MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH;
  33. class SpillRecord {
  34.   /** Backing store */
  35.   private final ByteBuffer buf;
  36.   /** View of backing storage as longs */
  37.   private final LongBuffer entries;
  38.   public SpillRecord(int numPartitions) {
  39.     buf = ByteBuffer.allocate(
  40.         numPartitions * MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH);
  41.     entries = buf.asLongBuffer();
  42.   }
  43.   public SpillRecord(Path indexFileName, JobConf job) throws IOException {
  44.     this(indexFileName, job, new CRC32());
  45.   }
  46.   public SpillRecord(Path indexFileName, JobConf job, Checksum crc)
  47.       throws IOException {
  48.     final FileSystem rfs = FileSystem.getLocal(job).getRaw();
  49.     final FSDataInputStream in = rfs.open(indexFileName);
  50.     try {
  51.       final long length = rfs.getFileStatus(indexFileName).getLen();
  52.       final int partitions = (int) length / MAP_OUTPUT_INDEX_RECORD_LENGTH;
  53.       final int size = partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH;
  54.       buf = ByteBuffer.allocate(size);
  55.       if (crc != null) {
  56.         crc.reset();
  57.         CheckedInputStream chk = new CheckedInputStream(in, crc);
  58.         IOUtils.readFully(chk, buf.array(), 0, size);
  59.         if (chk.getChecksum().getValue() != in.readLong()) {
  60.           throw new ChecksumException("Checksum error reading spill index: " +
  61.                                 indexFileName, -1);
  62.         }
  63.       } else {
  64.         IOUtils.readFully(in, buf.array(), 0, size);
  65.       }
  66.       entries = buf.asLongBuffer();
  67.     } finally {
  68.       in.close();
  69.     }
  70.   }
  71.   /**
  72.    * Return number of IndexRecord entries in this spill.
  73.    */
  74.   public int size() {
  75.     return entries.capacity() / (MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH / 8);
  76.   }
  77.   /**
  78.    * Get spill offsets for given partition.
  79.    */
  80.   public IndexRecord getIndex(int partition) {
  81.     final int pos = partition * MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH / 8;
  82.     return new IndexRecord(entries.get(pos), entries.get(pos + 1),
  83.                            entries.get(pos + 2));
  84.   }
  85.   /**
  86.    * Set spill offsets for given partition.
  87.    */
  88.   public void putIndex(IndexRecord rec, int partition) {
  89.     final int pos = partition * MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH / 8;
  90.     entries.put(pos, rec.startOffset);
  91.     entries.put(pos + 1, rec.rawLength);
  92.     entries.put(pos + 2, rec.partLength);
  93.   }
  94.   /**
  95.    * Write this spill record to the location provided.
  96.    */
  97.   public void writeToFile(Path loc, JobConf job)
  98.       throws IOException {
  99.     writeToFile(loc, job, new CRC32());
  100.   }
  101.   public void writeToFile(Path loc, JobConf job, Checksum crc)
  102.       throws IOException {
  103.     final FileSystem rfs = FileSystem.getLocal(job).getRaw();
  104.     CheckedOutputStream chk = null;
  105.     final FSDataOutputStream out = rfs.create(loc);
  106.     try {
  107.       if (crc != null) {
  108.         crc.reset();
  109.         chk = new CheckedOutputStream(out, crc);
  110.         chk.write(buf.array());
  111.         out.writeLong(chk.getChecksum().getValue());
  112.       } else {
  113.         out.write(buf.array());
  114.       }
  115.     } finally {
  116.       if (chk != null) {
  117.         chk.close();
  118.       } else {
  119.         out.close();
  120.       }
  121.     }
  122.   }
  123. }
  124. class IndexRecord {
  125.   long startOffset;
  126.   long rawLength;
  127.   long partLength;
  128.   public IndexRecord() { }
  129.   public IndexRecord(long startOffset, long rawLength, long partLength) {
  130.     this.startOffset = startOffset;
  131.     this.rawLength = rawLength;
  132.     this.partLength = partLength;
  133.   }
  134. }