MapOutputFile.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:6k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.IOException;
  20. import org.apache.hadoop.conf.Configuration;
  21. import org.apache.hadoop.fs.LocalDirAllocator;
  22. import org.apache.hadoop.fs.Path;
  23. /**
  24.  * Manipulate the working area for the transient store for maps and reduces.
  25.  */ 
  26. class MapOutputFile {
  27.   private JobConf conf;
  28.   private JobID jobId;
  29.   
  30.   MapOutputFile() {
  31.   }
  32.   MapOutputFile(JobID jobId) {
  33.     this.jobId = jobId;
  34.   }
  35.   private LocalDirAllocator lDirAlloc = 
  36.                             new LocalDirAllocator("mapred.local.dir");
  37.   
  38.   /** Return the path to local map output file created earlier
  39.    * @param mapTaskId a map task id
  40.    */
  41.   public Path getOutputFile(TaskAttemptID mapTaskId)
  42.     throws IOException {
  43.     return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
  44.                        jobId.toString(), mapTaskId.toString())
  45.                        + "/file.out", conf);
  46.   }
  47.   /** Create a local map output file name.
  48.    * @param mapTaskId a map task id
  49.    * @param size the size of the file
  50.    */
  51.   public Path getOutputFileForWrite(TaskAttemptID mapTaskId, long size)
  52.     throws IOException {
  53.     return lDirAlloc.getLocalPathForWrite(TaskTracker.getIntermediateOutputDir(
  54.                        jobId.toString(), mapTaskId.toString())
  55.                        + "/file.out", size, conf);
  56.   }
  57.   /** Return the path to a local map output index file created earlier
  58.    * @param mapTaskId a map task id
  59.    */
  60.   public Path getOutputIndexFile(TaskAttemptID mapTaskId)
  61.     throws IOException {
  62.     return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
  63.                        jobId.toString(), mapTaskId.toString())
  64.                        + "/file.out.index", conf);
  65.   }
  66.   /** Create a local map output index file name.
  67.    * @param mapTaskId a map task id
  68.    * @param size the size of the file
  69.    */
  70.   public Path getOutputIndexFileForWrite(TaskAttemptID mapTaskId, long size)
  71.     throws IOException {
  72.     return lDirAlloc.getLocalPathForWrite(TaskTracker.getIntermediateOutputDir(
  73.                        jobId.toString(), mapTaskId.toString())
  74.                        + "/file.out.index", 
  75.                        size, conf);
  76.   }
  77.   /** Return a local map spill file created earlier.
  78.    * @param mapTaskId a map task id
  79.    * @param spillNumber the number
  80.    */
  81.   public Path getSpillFile(TaskAttemptID mapTaskId, int spillNumber)
  82.     throws IOException {
  83.     return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
  84.                        jobId.toString(), mapTaskId.toString())
  85.                        + "/spill" 
  86.                        + spillNumber + ".out", conf);
  87.   }
  88.   /** Create a local map spill file name.
  89.    * @param mapTaskId a map task id
  90.    * @param spillNumber the number
  91.    * @param size the size of the file
  92.    */
  93.   public Path getSpillFileForWrite(TaskAttemptID mapTaskId, int spillNumber, 
  94.          long size) throws IOException {
  95.     return lDirAlloc.getLocalPathForWrite(TaskTracker.getIntermediateOutputDir(
  96.                        jobId.toString(), mapTaskId.toString())
  97.                        + "/spill" + 
  98.                        spillNumber + ".out", size, conf);
  99.   }
  100.   /** Return a local map spill index file created earlier
  101.    * @param mapTaskId a map task id
  102.    * @param spillNumber the number
  103.    */
  104.   public Path getSpillIndexFile(TaskAttemptID mapTaskId, int spillNumber)
  105.     throws IOException {
  106.     return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
  107.                        jobId.toString(), mapTaskId.toString())
  108.                        + "/spill" + 
  109.                        spillNumber + ".out.index", conf);
  110.   }
  111.   /** Create a local map spill index file name.
  112.    * @param mapTaskId a map task id
  113.    * @param spillNumber the number
  114.    * @param size the size of the file
  115.    */
  116.   public Path getSpillIndexFileForWrite(TaskAttemptID mapTaskId, int spillNumber,
  117.          long size) throws IOException {
  118.     return lDirAlloc.getLocalPathForWrite(TaskTracker.getIntermediateOutputDir(
  119.                        jobId.toString(), mapTaskId.toString())
  120.                        + "/spill" + spillNumber + 
  121.                        ".out.index", size, conf);
  122.   }
  123.   /** Return a local reduce input file created earlier
  124.    * @param mapTaskId a map task id
  125.    * @param reduceTaskId a reduce task id
  126.    */
  127.   public Path getInputFile(int mapId, TaskAttemptID reduceTaskId)
  128.     throws IOException {
  129.     // TODO *oom* should use a format here
  130.     return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
  131.                        jobId.toString(), reduceTaskId.toString())
  132.                        + "/map_" + mapId + ".out",
  133.                        conf);
  134.   }
  135.   /** Create a local reduce input file name.
  136.    * @param mapTaskId a map task id
  137.    * @param reduceTaskId a reduce task id
  138.    * @param size the size of the file
  139.    */
  140.   public Path getInputFileForWrite(TaskID mapId, TaskAttemptID reduceTaskId, 
  141.                                    long size)
  142.     throws IOException {
  143.     // TODO *oom* should use a format here
  144.     return lDirAlloc.getLocalPathForWrite(TaskTracker.getIntermediateOutputDir(
  145.                        jobId.toString(), reduceTaskId.toString())
  146.                        + "/map_" + mapId.getId() + ".out", 
  147.                        size, conf);
  148.   }
  149.   /** Removes all of the files related to a task. */
  150.   public void removeAll(TaskAttemptID taskId) throws IOException {
  151.     conf.deleteLocalFiles(TaskTracker.getIntermediateOutputDir(
  152.                           jobId.toString(), taskId.toString())
  153. );
  154.   }
  155.   public void setConf(Configuration conf) {
  156.     if (conf instanceof JobConf) {
  157.       this.conf = (JobConf) conf;
  158.     } else {
  159.       this.conf = new JobConf(conf);
  160.     }
  161.   }
  162.   
  163.   public void setJobId(JobID jobId) {
  164.     this.jobId = jobId;
  165.   }
  166. }