TaskTracker.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:110k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18.  package org.apache.hadoop.mapred;
  19. import java.io.File;
  20. import java.io.FileNotFoundException;
  21. import java.io.IOException;
  22. import java.io.OutputStream;
  23. import java.io.RandomAccessFile;
  24. import java.net.InetSocketAddress;
  25. import java.net.URI;
  26. import java.net.URISyntaxException;
  27. import java.util.ArrayList;
  28. import java.util.Collections;
  29. import java.util.HashMap;
  30. import java.util.HashSet;
  31. import java.util.Iterator;
  32. import java.util.LinkedHashMap;
  33. import java.util.LinkedList;
  34. import java.util.List;
  35. import java.util.Map;
  36. import java.util.Random;
  37. import java.util.Set;
  38. import java.util.TreeMap;
  39. import java.util.Vector;
  40. import java.util.concurrent.BlockingQueue;
  41. import java.util.concurrent.LinkedBlockingQueue;
  42. import java.util.regex.Pattern;
  43. import javax.servlet.ServletContext;
  44. import javax.servlet.ServletException;
  45. import javax.servlet.http.HttpServlet;
  46. import javax.servlet.http.HttpServletRequest;
  47. import javax.servlet.http.HttpServletResponse;
  48. import org.apache.commons.logging.Log;
  49. import org.apache.commons.logging.LogFactory;
  50. import org.apache.hadoop.conf.Configuration;
  51. import org.apache.hadoop.filecache.DistributedCache;
  52. import org.apache.hadoop.fs.DF;
  53. import org.apache.hadoop.fs.FSDataInputStream;
  54. import org.apache.hadoop.fs.FileStatus;
  55. import org.apache.hadoop.fs.FileSystem;
  56. import org.apache.hadoop.fs.FileUtil;
  57. import org.apache.hadoop.fs.LocalDirAllocator;
  58. import org.apache.hadoop.fs.LocalFileSystem;
  59. import org.apache.hadoop.fs.Path;
  60. import org.apache.hadoop.http.HttpServer;
  61. import org.apache.hadoop.io.IntWritable;
  62. import org.apache.hadoop.ipc.RPC;
  63. import org.apache.hadoop.ipc.RemoteException;
  64. import org.apache.hadoop.ipc.Server;
  65. import org.apache.hadoop.mapred.TaskStatus.Phase;
  66. import org.apache.hadoop.mapred.pipes.Submitter;
  67. import org.apache.hadoop.metrics.MetricsContext;
  68. import org.apache.hadoop.metrics.MetricsException;
  69. import org.apache.hadoop.metrics.MetricsRecord;
  70. import org.apache.hadoop.metrics.MetricsUtil;
  71. import org.apache.hadoop.metrics.Updater;
  72. import org.apache.hadoop.net.DNS;
  73. import org.apache.hadoop.net.NetUtils;
  74. import org.apache.hadoop.security.SecurityUtil;
  75. import org.apache.hadoop.security.authorize.ConfiguredPolicy;
  76. import org.apache.hadoop.security.authorize.PolicyProvider;
  77. import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
  78. import org.apache.hadoop.util.DiskChecker;
  79. import org.apache.hadoop.util.MemoryCalculatorPlugin;
  80. import org.apache.hadoop.util.ProcfsBasedProcessTree;
  81. import org.apache.hadoop.util.ReflectionUtils;
  82. import org.apache.hadoop.util.RunJar;
  83. import org.apache.hadoop.util.StringUtils;
  84. import org.apache.hadoop.util.VersionInfo;
  85. import org.apache.hadoop.util.DiskChecker.DiskErrorException;
  86. import org.apache.hadoop.util.Shell.ShellCommandExecutor;
  87. /*******************************************************
  88.  * TaskTracker is a process that starts and tracks MR Tasks
  89.  * in a networked environment.  It contacts the JobTracker
  90.  * for Task assignments and reporting results.
  91.  *
  92.  *******************************************************/
  93. public class TaskTracker 
  94.              implements MRConstants, TaskUmbilicalProtocol, Runnable {
  95.   static final long WAIT_FOR_DONE = 3 * 1000;
  96.   private int httpPort;
  97.   static enum State {NORMAL, STALE, INTERRUPTED, DENIED}
  98.   static{
  99.     Configuration.addDefaultResource("mapred-default.xml");
  100.     Configuration.addDefaultResource("mapred-site.xml");
  101.   }
  102.   public static final Log LOG =
  103.     LogFactory.getLog(TaskTracker.class);
  104.   public static final String MR_CLIENTTRACE_FORMAT =
  105.         "src: %s" +     // src IP
  106.         ", dest: %s" +  // dst IP
  107.         ", bytes: %s" + // byte count
  108.         ", op: %s" +    // operation
  109.         ", cliID: %s";  // task id
  110.   public static final Log ClientTraceLog =
  111.     LogFactory.getLog(TaskTracker.class.getName() + ".clienttrace");
  112.   volatile boolean running = true;
  113.   private LocalDirAllocator localDirAllocator;
  114.   String taskTrackerName;
  115.   String localHostname;
  116.   InetSocketAddress jobTrackAddr;
  117.     
  118.   InetSocketAddress taskReportAddress;
  119.   Server taskReportServer = null;
  120.   InterTrackerProtocol jobClient;
  121.     
  122.   // last heartbeat response recieved
  123.   short heartbeatResponseId = -1;
  124.   /*
  125.    * This is the last 'status' report sent by this tracker to the JobTracker.
  126.    * 
  127.    * If the rpc call succeeds, this 'status' is cleared-out by this tracker;
  128.    * indicating that a 'fresh' status report be generated; in the event the
  129.    * rpc calls fails for whatever reason, the previous status report is sent
  130.    * again.
  131.    */
  132.   TaskTrackerStatus status = null;
  133.   
  134.   // The system-directory on HDFS where job files are stored 
  135.   Path systemDirectory = null;
  136.   
  137.   // The filesystem where job files are stored
  138.   FileSystem systemFS = null;
  139.   
  140.   private final HttpServer server;
  141.     
  142.   volatile boolean shuttingDown = false;
  143.     
  144.   Map<TaskAttemptID, TaskInProgress> tasks = new HashMap<TaskAttemptID, TaskInProgress>();
  145.   /**
  146.    * Map from taskId -> TaskInProgress.
  147.    */
  148.   Map<TaskAttemptID, TaskInProgress> runningTasks = null;
  149.   Map<JobID, RunningJob> runningJobs = null;
  150.   volatile int mapTotal = 0;
  151.   volatile int reduceTotal = 0;
  152.   boolean justStarted = true;
  153.   boolean justInited = true;
  154.   // Mark reduce tasks that are shuffling to rollback their events index
  155.   Set<TaskAttemptID> shouldReset = new HashSet<TaskAttemptID>();
  156.     
  157.   //dir -> DF
  158.   Map<String, DF> localDirsDf = new HashMap<String, DF>();
  159.   long minSpaceStart = 0;
  160.   //must have this much space free to start new tasks
  161.   boolean acceptNewTasks = true;
  162.   long minSpaceKill = 0;
  163.   //if we run under this limit, kill one task
  164.   //and make sure we never receive any new jobs
  165.   //until all the old tasks have been cleaned up.
  166.   //this is if a machine is so full it's only good
  167.   //for serving map output to the other nodes
  168.   static Random r = new Random();
  169.   private static final String SUBDIR = "taskTracker";
  170.   private static final String CACHEDIR = "archive";
  171.   private static final String JOBCACHE = "jobcache";
  172.   private static final String PID = "pid";
  173.   private static final String OUTPUT = "output";
  174.   private JobConf originalConf;
  175.   private JobConf fConf;
  176.   private int maxCurrentMapTasks;
  177.   private int maxCurrentReduceTasks;
  178.   private int failures;
  179.   private MapEventsFetcherThread mapEventsFetcher;
  180.   int workerThreads;
  181.   private CleanupQueue directoryCleanupThread;
  182.   volatile JvmManager jvmManager;
  183.   
  184.   private TaskMemoryManagerThread taskMemoryManager;
  185.   private boolean taskMemoryManagerEnabled = true;
  186.   private long totalVirtualMemoryOnTT = JobConf.DISABLED_MEMORY_LIMIT;
  187.   private long totalPmemOnTT = JobConf.DISABLED_MEMORY_LIMIT;
  188.   private long reservedVirtualMemory = JobConf.DISABLED_MEMORY_LIMIT;
  189.   private long reservedPmem = JobConf.DISABLED_MEMORY_LIMIT;
  190.   // Cluster wide default value for max-vm per task
  191.   private long defaultMaxVmPerTask = JobConf.DISABLED_MEMORY_LIMIT;
  192.   // Cluster wide upper limit on max-vm per task
  193.   private long limitMaxVmPerTask = JobConf.DISABLED_MEMORY_LIMIT;
  194.   /**
  195.    * Configuration property to specify the amount of virtual memory that has to
  196.    * be reserved by the TaskTracker for system usage (OS, TT etc). The reserved
  197.    * virtual memory should be a part of the total virtual memory available on
  198.    * the TaskTracker. TaskTracker obtains the total virtual memory available on
  199.    * the system by using a {@link MemoryCalculatorPlugin}. The total physical
  200.    * memory is set to {@link JobConf#DISABLED_MEMORY_LIMIT} on systems lacking a
  201.    * MemoryCalculatorPlugin implementation.
  202.    * 
  203.    * <p>
  204.    * 
  205.    * The reserved virtual memory and the total virtual memory values are
  206.    * reported by the TaskTracker as part of heart-beat so that they can
  207.    * considered by a scheduler.
  208.    * 
  209.    * <p>
  210.    * 
  211.    * These two values are also used by the TaskTracker for tracking tasks'
  212.    * memory usage. Memory management functionality on a TaskTracker is disabled
  213.    * if this property is not set, if it more than the total virtual memory
  214.    * reported by MemoryCalculatorPlugin, or if either of the values is negative.
  215.    */
  216.   static final String MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY =
  217.       "mapred.tasktracker.vmem.reserved";
  218.   /**
  219.    * Configuration property to specify the amount of physical memory that has to
  220.    * be reserved by the TaskTracker for system usage (OS, TT etc). The reserved
  221.    * physical memory should be a part of the total physical memory available on
  222.    * the TaskTracker. TaskTracker obtains the total physical memory available on
  223.    * the system by using a {@link MemoryCalculatorPlugin}. The total physical
  224.    * memory is set to {@link JobConf#DISABLED_MEMORY_LIMIT} on systems lacking a
  225.    * MemoryCalculatorPlugin implementation.
  226.    * 
  227.    * <p>
  228.    * 
  229.    * The reserved virtual memory and the total virtual memory values are
  230.    * reported by the TaskTracker as part of heart-beat so that they can
  231.    * considered by a scheduler.
  232.    * 
  233.    */
  234.   static final String MAPRED_TASKTRACKER_PMEM_RESERVED_PROPERTY =
  235.       "mapred.tasktracker.pmem.reserved";
  236.   static final String MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY =
  237.       "mapred.tasktracker.memory_calculator_plugin";
  238.   /**
  239.    * the minimum interval between jobtracker polls
  240.    */
  241.   private volatile int heartbeatInterval = HEARTBEAT_INTERVAL_MIN;
  242.   /**
  243.    * Number of maptask completion events locations to poll for at one time
  244.    */  
  245.   private int probe_sample_size = 500;
  246.   private IndexCache indexCache;
  247.     
  248.   /*
  249.    * A list of commitTaskActions for whom commit response has been received 
  250.    */
  251.   private List<TaskAttemptID> commitResponses = 
  252.             Collections.synchronizedList(new ArrayList<TaskAttemptID>());
  253.   private ShuffleServerMetrics shuffleServerMetrics;
  254.   /** This class contains the methods that should be used for metrics-reporting
  255.    * the specific metrics for shuffle. The TaskTracker is actually a server for
  256.    * the shuffle and hence the name ShuffleServerMetrics.
  257.    */
  258.   private class ShuffleServerMetrics implements Updater {
  259.     private MetricsRecord shuffleMetricsRecord = null;
  260.     private int serverHandlerBusy = 0;
  261.     private long outputBytes = 0;
  262.     private int failedOutputs = 0;
  263.     private int successOutputs = 0;
  264.     ShuffleServerMetrics(JobConf conf) {
  265.       MetricsContext context = MetricsUtil.getContext("mapred");
  266.       shuffleMetricsRecord = 
  267.                            MetricsUtil.createRecord(context, "shuffleOutput");
  268.       this.shuffleMetricsRecord.setTag("sessionId", conf.getSessionId());
  269.       context.registerUpdater(this);
  270.     }
  271.     synchronized void serverHandlerBusy() {
  272.       ++serverHandlerBusy;
  273.     }
  274.     synchronized void serverHandlerFree() {
  275.       --serverHandlerBusy;
  276.     }
  277.     synchronized void outputBytes(long bytes) {
  278.       outputBytes += bytes;
  279.     }
  280.     synchronized void failedOutput() {
  281.       ++failedOutputs;
  282.     }
  283.     synchronized void successOutput() {
  284.       ++successOutputs;
  285.     }
  286.     public void doUpdates(MetricsContext unused) {
  287.       synchronized (this) {
  288.         if (workerThreads != 0) {
  289.           shuffleMetricsRecord.setMetric("shuffle_handler_busy_percent", 
  290.               100*((float)serverHandlerBusy/workerThreads));
  291.         } else {
  292.           shuffleMetricsRecord.setMetric("shuffle_handler_busy_percent", 0);
  293.         }
  294.         shuffleMetricsRecord.incrMetric("shuffle_output_bytes", 
  295.                                         outputBytes);
  296.         shuffleMetricsRecord.incrMetric("shuffle_failed_outputs", 
  297.                                         failedOutputs);
  298.         shuffleMetricsRecord.incrMetric("shuffle_success_outputs", 
  299.                                         successOutputs);
  300.         outputBytes = 0;
  301.         failedOutputs = 0;
  302.         successOutputs = 0;
  303.       }
  304.       shuffleMetricsRecord.update();
  305.     }
  306.   }
  307.   
  308.   
  309.   
  310.     
  311.   private TaskTrackerInstrumentation myInstrumentation = null;
  312.   public TaskTrackerInstrumentation getTaskTrackerInstrumentation() {
  313.     return myInstrumentation;
  314.   }
  315.   
  316.   /**
  317.    * A list of tips that should be cleaned up.
  318.    */
  319.   private BlockingQueue<TaskTrackerAction> tasksToCleanup = 
  320.     new LinkedBlockingQueue<TaskTrackerAction>();
  321.     
  322.   /**
  323.    * A daemon-thread that pulls tips off the list of things to cleanup.
  324.    */
  325.   private Thread taskCleanupThread = 
  326.     new Thread(new Runnable() {
  327.         public void run() {
  328.           while (true) {
  329.             try {
  330.               TaskTrackerAction action = tasksToCleanup.take();
  331.               if (action instanceof KillJobAction) {
  332.                 purgeJob((KillJobAction) action);
  333.               } else if (action instanceof KillTaskAction) {
  334.                 TaskInProgress tip;
  335.                 KillTaskAction killAction = (KillTaskAction) action;
  336.                 synchronized (TaskTracker.this) {
  337.                   tip = tasks.get(killAction.getTaskID());
  338.                 }
  339.                 LOG.info("Received KillTaskAction for task: " + 
  340.                          killAction.getTaskID());
  341.                 purgeTask(tip, false);
  342.               } else {
  343.                 LOG.error("Non-delete action given to cleanup thread: "
  344.                           + action);
  345.               }
  346.             } catch (Throwable except) {
  347.               LOG.warn(StringUtils.stringifyException(except));
  348.             }
  349.           }
  350.         }
  351.       }, "taskCleanup");
  352.     
  353.   private RunningJob addTaskToJob(JobID jobId, 
  354.                                   TaskInProgress tip) {
  355.     synchronized (runningJobs) {
  356.       RunningJob rJob = null;
  357.       if (!runningJobs.containsKey(jobId)) {
  358.         rJob = new RunningJob(jobId);
  359.         rJob.localized = false;
  360.         rJob.tasks = new HashSet<TaskInProgress>();
  361.         runningJobs.put(jobId, rJob);
  362.       } else {
  363.         rJob = runningJobs.get(jobId);
  364.       }
  365.       synchronized (rJob) {
  366.         rJob.tasks.add(tip);
  367.       }
  368.       runningJobs.notify(); //notify the fetcher thread
  369.       return rJob;
  370.     }
  371.   }
  372.   private void removeTaskFromJob(JobID jobId, TaskInProgress tip) {
  373.     synchronized (runningJobs) {
  374.       RunningJob rjob = runningJobs.get(jobId);
  375.       if (rjob == null) {
  376.         LOG.warn("Unknown job " + jobId + " being deleted.");
  377.       } else {
  378.         synchronized (rjob) {
  379.           rjob.tasks.remove(tip);
  380.         }
  381.       }
  382.     }
  383.   }
  384.   static String getCacheSubdir() {
  385.     return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.CACHEDIR;
  386.   }
  387.   static String getJobCacheSubdir() {
  388.     return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.JOBCACHE;
  389.   }
  390.   static String getLocalJobDir(String jobid) {
  391. return getJobCacheSubdir() + Path.SEPARATOR + jobid; 
  392.   }
  393.   static String getLocalTaskDir(String jobid, String taskid) {
  394. return getLocalTaskDir(jobid, taskid, false) ; 
  395.   }
  396.   static String getIntermediateOutputDir(String jobid, String taskid) {
  397. return getLocalTaskDir(jobid, taskid) 
  398.            + Path.SEPARATOR + TaskTracker.OUTPUT ; 
  399.   }
  400.   static String getLocalTaskDir(String jobid, 
  401.                                 String taskid, 
  402.                                 boolean isCleanupAttempt) {
  403. String taskDir = getLocalJobDir(jobid) + Path.SEPARATOR + taskid;
  404. if (isCleanupAttempt) { 
  405.       taskDir = taskDir + ".cleanup";
  406. }
  407. return taskDir;
  408.   }
  409.   static String getPidFile(String jobid, 
  410.                            String taskid, 
  411.                            boolean isCleanup) {
  412.     return  getLocalTaskDir(jobid, taskid, isCleanup)
  413.             + Path.SEPARATOR + PID;
  414.   }
  415.   public long getProtocolVersion(String protocol, 
  416.                                  long clientVersion) throws IOException {
  417.     if (protocol.equals(TaskUmbilicalProtocol.class.getName())) {
  418.       return TaskUmbilicalProtocol.versionID;
  419.     } else {
  420.       throw new IOException("Unknown protocol for task tracker: " +
  421.                             protocol);
  422.     }
  423.   }
  424.     
  425.   /**
  426.    * Do the real constructor work here.  It's in a separate method
  427.    * so we can call it again and "recycle" the object after calling
  428.    * close().
  429.    */
  430.   synchronized void initialize() throws IOException {
  431.     // use configured nameserver & interface to get local hostname
  432.     this.fConf = new JobConf(originalConf);
  433.     if (fConf.get("slave.host.name") != null) {
  434.       this.localHostname = fConf.get("slave.host.name");
  435.     }
  436.     if (localHostname == null) {
  437.       this.localHostname =
  438.       DNS.getDefaultHost
  439.       (fConf.get("mapred.tasktracker.dns.interface","default"),
  440.        fConf.get("mapred.tasktracker.dns.nameserver","default"));
  441.     }
  442.  
  443.     //check local disk
  444.     checkLocalDirs(this.fConf.getLocalDirs());
  445.     fConf.deleteLocalFiles(SUBDIR);
  446.     // Clear out state tables
  447.     this.tasks.clear();
  448.     this.runningTasks = new LinkedHashMap<TaskAttemptID, TaskInProgress>();
  449.     this.runningJobs = new TreeMap<JobID, RunningJob>();
  450.     this.mapTotal = 0;
  451.     this.reduceTotal = 0;
  452.     this.acceptNewTasks = true;
  453.     this.status = null;
  454.     this.minSpaceStart = this.fConf.getLong("mapred.local.dir.minspacestart", 0L);
  455.     this.minSpaceKill = this.fConf.getLong("mapred.local.dir.minspacekill", 0L);
  456.     //tweak the probe sample size (make it a function of numCopiers)
  457.     probe_sample_size = this.fConf.getInt("mapred.tasktracker.events.batchsize", 500);
  458.     
  459.     Class<? extends TaskTrackerInstrumentation> metricsInst = getInstrumentationClass(fConf);
  460.     try {
  461.       java.lang.reflect.Constructor<? extends TaskTrackerInstrumentation> c =
  462.         metricsInst.getConstructor(new Class[] {TaskTracker.class} );
  463.       this.myInstrumentation = c.newInstance(this);
  464.     } catch(Exception e) {
  465.       //Reflection can throw lots of exceptions -- handle them all by 
  466.       //falling back on the default.
  467.       LOG.error("failed to initialize taskTracker metrics", e);
  468.       this.myInstrumentation = new TaskTrackerMetricsInst(this);
  469.     }
  470.     
  471.     // bind address
  472.     String address = 
  473.       NetUtils.getServerAddress(fConf,
  474.                                 "mapred.task.tracker.report.bindAddress", 
  475.                                 "mapred.task.tracker.report.port", 
  476.                                 "mapred.task.tracker.report.address");
  477.     InetSocketAddress socAddr = NetUtils.createSocketAddr(address);
  478.     String bindAddress = socAddr.getHostName();
  479.     int tmpPort = socAddr.getPort();
  480.     
  481.     this.jvmManager = new JvmManager(this);
  482.     // Set service-level authorization security policy
  483.     if (this.fConf.getBoolean(
  484.           ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG, false)) {
  485.       PolicyProvider policyProvider = 
  486.         (PolicyProvider)(ReflectionUtils.newInstance(
  487.             this.fConf.getClass(PolicyProvider.POLICY_PROVIDER_CONFIG, 
  488.                 MapReducePolicyProvider.class, PolicyProvider.class), 
  489.             this.fConf));
  490.       SecurityUtil.setPolicy(new ConfiguredPolicy(this.fConf, policyProvider));
  491.     }
  492.     
  493.     // RPC initialization
  494.     int max = maxCurrentMapTasks > maxCurrentReduceTasks ? 
  495.                        maxCurrentMapTasks : maxCurrentReduceTasks;
  496.     //set the num handlers to max*2 since canCommit may wait for the duration
  497.     //of a heartbeat RPC
  498.     this.taskReportServer =
  499.       RPC.getServer(this, bindAddress, tmpPort, 2 * max, false, this.fConf);
  500.     this.taskReportServer.start();
  501.     // get the assigned address
  502.     this.taskReportAddress = taskReportServer.getListenerAddress();
  503.     this.fConf.set("mapred.task.tracker.report.address",
  504.         taskReportAddress.getHostName() + ":" + taskReportAddress.getPort());
  505.     LOG.info("TaskTracker up at: " + this.taskReportAddress);
  506.     this.taskTrackerName = "tracker_" + localHostname + ":" + taskReportAddress;
  507.     LOG.info("Starting tracker " + taskTrackerName);
  508.     // Clear out temporary files that might be lying around
  509.     DistributedCache.purgeCache(this.fConf);
  510.     cleanupStorage();
  511.     this.jobClient = (InterTrackerProtocol) 
  512.       RPC.waitForProxy(InterTrackerProtocol.class,
  513.                        InterTrackerProtocol.versionID, 
  514.                        jobTrackAddr, this.fConf);
  515.     this.justInited = true;
  516.     this.running = true;    
  517.     // start the thread that will fetch map task completion events
  518.     this.mapEventsFetcher = new MapEventsFetcherThread();
  519.     mapEventsFetcher.setDaemon(true);
  520.     mapEventsFetcher.setName(
  521.                              "Map-events fetcher for all reduce tasks " + "on " + 
  522.                              taskTrackerName);
  523.     mapEventsFetcher.start();
  524.     initializeMemoryManagement();
  525.     this.indexCache = new IndexCache(this.fConf);
  526.     mapLauncher = new TaskLauncher(maxCurrentMapTasks);
  527.     reduceLauncher = new TaskLauncher(maxCurrentReduceTasks);
  528.     mapLauncher.start();
  529.     reduceLauncher.start();
  530.   }
  531.   
  532.   public static Class<? extends TaskTrackerInstrumentation> getInstrumentationClass(Configuration conf) {
  533.     return conf.getClass("mapred.tasktracker.instrumentation",
  534.         TaskTrackerMetricsInst.class, TaskTrackerInstrumentation.class);
  535.   }
  536.   
  537.   public static void setInstrumentationClass(Configuration conf, Class<? extends TaskTrackerInstrumentation> t) {
  538.     conf.setClass("mapred.tasktracker.instrumentation",
  539.         t, TaskTrackerInstrumentation.class);
  540.   }
  541.   
  542.   /** 
  543.    * Removes all contents of temporary storage.  Called upon 
  544.    * startup, to remove any leftovers from previous run.
  545.    */
  546.   public void cleanupStorage() throws IOException {
  547.     this.fConf.deleteLocalFiles();
  548.   }
  549.   // Object on wait which MapEventsFetcherThread is going to wait.
  550.   private Object waitingOn = new Object();
  551.   private class MapEventsFetcherThread extends Thread {
  552.     private List <FetchStatus> reducesInShuffle() {
  553.       List <FetchStatus> fList = new ArrayList<FetchStatus>();
  554.       for (Map.Entry <JobID, RunningJob> item : runningJobs.entrySet()) {
  555.         RunningJob rjob = item.getValue();
  556.         JobID jobId = item.getKey();
  557.         FetchStatus f;
  558.         synchronized (rjob) {
  559.           f = rjob.getFetchStatus();
  560.           for (TaskInProgress tip : rjob.tasks) {
  561.             Task task = tip.getTask();
  562.             if (!task.isMapTask()) {
  563.               if (((ReduceTask)task).getPhase() == 
  564.                   TaskStatus.Phase.SHUFFLE) {
  565.                 if (rjob.getFetchStatus() == null) {
  566.                   //this is a new job; we start fetching its map events
  567.                   f = new FetchStatus(jobId, 
  568.                                       ((ReduceTask)task).getNumMaps());
  569.                   rjob.setFetchStatus(f);
  570.                 }
  571.                 f = rjob.getFetchStatus();
  572.                 fList.add(f);
  573.                 break; //no need to check any more tasks belonging to this
  574.               }
  575.             }
  576.           }
  577.         }
  578.       }
  579.       //at this point, we have information about for which of
  580.       //the running jobs do we need to query the jobtracker for map 
  581.       //outputs (actually map events).
  582.       return fList;
  583.     }
  584.       
  585.     @Override
  586.     public void run() {
  587.       LOG.info("Starting thread: " + this.getName());
  588.         
  589.       while (running) {
  590.         try {
  591.           List <FetchStatus> fList = null;
  592.           synchronized (runningJobs) {
  593.             while (((fList = reducesInShuffle()).size()) == 0) {
  594.               try {
  595.                 runningJobs.wait();
  596.               } catch (InterruptedException e) {
  597.                 LOG.info("Shutting down: " + this.getName());
  598.                 return;
  599.               }
  600.             }
  601.           }
  602.           // now fetch all the map task events for all the reduce tasks
  603.           // possibly belonging to different jobs
  604.           boolean fetchAgain = false; //flag signifying whether we want to fetch
  605.                                       //immediately again.
  606.           for (FetchStatus f : fList) {
  607.             long currentTime = System.currentTimeMillis();
  608.             try {
  609.               //the method below will return true when we have not 
  610.               //fetched all available events yet
  611.               if (f.fetchMapCompletionEvents(currentTime)) {
  612.                 fetchAgain = true;
  613.               }
  614.             } catch (Exception e) {
  615.               LOG.warn(
  616.                        "Ignoring exception that fetch for map completion" +
  617.                        " events threw for " + f.jobId + " threw: " +
  618.                        StringUtils.stringifyException(e)); 
  619.             }
  620.             if (!running) {
  621.               break;
  622.             }
  623.           }
  624.           synchronized (waitingOn) {
  625.             try {
  626.               if (!fetchAgain) {
  627.                 waitingOn.wait(heartbeatInterval);
  628.               }
  629.             } catch (InterruptedException ie) {
  630.               LOG.info("Shutting down: " + this.getName());
  631.               return;
  632.             }
  633.           }
  634.         } catch (Exception e) {
  635.           LOG.info("Ignoring exception "  + e.getMessage());
  636.         }
  637.       }
  638.     } 
  639.   }
  640.   private class FetchStatus {
  641.     /** The next event ID that we will start querying the JobTracker from*/
  642.     private IntWritable fromEventId;
  643.     /** This is the cache of map events for a given job */ 
  644.     private List<TaskCompletionEvent> allMapEvents;
  645.     /** What jobid this fetchstatus object is for*/
  646.     private JobID jobId;
  647.     private long lastFetchTime;
  648.     private boolean fetchAgain;
  649.      
  650.     public FetchStatus(JobID jobId, int numMaps) {
  651.       this.fromEventId = new IntWritable(0);
  652.       this.jobId = jobId;
  653.       this.allMapEvents = new ArrayList<TaskCompletionEvent>(numMaps);
  654.     }
  655.       
  656.     /**
  657.      * Reset the events obtained so far.
  658.      */
  659.     public void reset() {
  660.       // Note that the sync is first on fromEventId and then on allMapEvents
  661.       synchronized (fromEventId) {
  662.         synchronized (allMapEvents) {
  663.           fromEventId.set(0); // set the new index for TCE
  664.           allMapEvents.clear();
  665.         }
  666.       }
  667.     }
  668.     
  669.     public TaskCompletionEvent[] getMapEvents(int fromId, int max) {
  670.         
  671.       TaskCompletionEvent[] mapEvents = 
  672.         TaskCompletionEvent.EMPTY_ARRAY;
  673.       boolean notifyFetcher = false; 
  674.       synchronized (allMapEvents) {
  675.         if (allMapEvents.size() > fromId) {
  676.           int actualMax = Math.min(max, (allMapEvents.size() - fromId));
  677.           List <TaskCompletionEvent> eventSublist = 
  678.             allMapEvents.subList(fromId, actualMax + fromId);
  679.           mapEvents = eventSublist.toArray(mapEvents);
  680.         } else {
  681.           // Notify Fetcher thread. 
  682.           notifyFetcher = true;
  683.         }
  684.       }
  685.       if (notifyFetcher) {
  686.         synchronized (waitingOn) {
  687.           waitingOn.notify();
  688.         }
  689.       }
  690.       return mapEvents;
  691.     }
  692.       
  693.     public boolean fetchMapCompletionEvents(long currTime) throws IOException {
  694.       if (!fetchAgain && (currTime - lastFetchTime) < heartbeatInterval) {
  695.         return false;
  696.       }
  697.       int currFromEventId = 0;
  698.       synchronized (fromEventId) {
  699.         currFromEventId = fromEventId.get();
  700.         List <TaskCompletionEvent> recentMapEvents = 
  701.           queryJobTracker(fromEventId, jobId, jobClient);
  702.         synchronized (allMapEvents) {
  703.           allMapEvents.addAll(recentMapEvents);
  704.         }
  705.         lastFetchTime = currTime;
  706.         if (fromEventId.get() - currFromEventId >= probe_sample_size) {
  707.           //return true when we have fetched the full payload, indicating
  708.           //that we should fetch again immediately (there might be more to
  709.           //fetch
  710.           fetchAgain = true;
  711.           return true;
  712.         }
  713.       }
  714.       fetchAgain = false;
  715.       return false;
  716.     }
  717.   }
  718.   private LocalDirAllocator lDirAlloc = 
  719.                               new LocalDirAllocator("mapred.local.dir");
  720.   // intialize the job directory
  721.   private void localizeJob(TaskInProgress tip) throws IOException {
  722.     Path localJarFile = null;
  723.     Task t = tip.getTask();
  724.     JobID jobId = t.getJobID();
  725.     Path jobFile = new Path(t.getJobFile());
  726.     // Get sizes of JobFile and JarFile
  727.     // sizes are -1 if they are not present.
  728.     FileStatus status = null;
  729.     long jobFileSize = -1;
  730.     try {
  731.       status = systemFS.getFileStatus(jobFile);
  732.       jobFileSize = status.getLen();
  733.     } catch(FileNotFoundException fe) {
  734.       jobFileSize = -1;
  735.     }
  736.     Path localJobFile = lDirAlloc.getLocalPathForWrite(
  737.                                     getLocalJobDir(jobId.toString())
  738.                                     + Path.SEPARATOR + "job.xml",
  739.                                     jobFileSize, fConf);
  740.     RunningJob rjob = addTaskToJob(jobId, tip);
  741.     synchronized (rjob) {
  742.       if (!rjob.localized) {
  743.   
  744.         FileSystem localFs = FileSystem.getLocal(fConf);
  745.         // this will happen on a partial execution of localizeJob.
  746.         // Sometimes the job.xml gets copied but copying job.jar
  747.         // might throw out an exception
  748.         // we should clean up and then try again
  749.         Path jobDir = localJobFile.getParent();
  750.         if (localFs.exists(jobDir)){
  751.           localFs.delete(jobDir, true);
  752.           boolean b = localFs.mkdirs(jobDir);
  753.           if (!b)
  754.             throw new IOException("Not able to create job directory "
  755.                                   + jobDir.toString());
  756.         }
  757.         systemFS.copyToLocalFile(jobFile, localJobFile);
  758.         JobConf localJobConf = new JobConf(localJobFile);
  759.         
  760.         // create the 'work' directory
  761.         // job-specific shared directory for use as scratch space 
  762.         Path workDir = lDirAlloc.getLocalPathForWrite(
  763.                          (getLocalJobDir(jobId.toString())
  764.                          + Path.SEPARATOR + "work"), fConf);
  765.         if (!localFs.mkdirs(workDir)) {
  766.           throw new IOException("Mkdirs failed to create " 
  767.                       + workDir.toString());
  768.         }
  769.         System.setProperty("job.local.dir", workDir.toString());
  770.         localJobConf.set("job.local.dir", workDir.toString());
  771.         
  772.         // copy Jar file to the local FS and unjar it.
  773.         String jarFile = localJobConf.getJar();
  774.         long jarFileSize = -1;
  775.         if (jarFile != null) {
  776.           Path jarFilePath = new Path(jarFile);
  777.           try {
  778.             status = systemFS.getFileStatus(jarFilePath);
  779.             jarFileSize = status.getLen();
  780.           } catch(FileNotFoundException fe) {
  781.             jarFileSize = -1;
  782.           }
  783.           // Here we check for and we check five times the size of jarFileSize
  784.           // to accommodate for unjarring the jar file in work directory 
  785.           localJarFile = new Path(lDirAlloc.getLocalPathForWrite(
  786.                                      getLocalJobDir(jobId.toString())
  787.                                      + Path.SEPARATOR + "jars",
  788.                                      5 * jarFileSize, fConf), "job.jar");
  789.           if (!localFs.mkdirs(localJarFile.getParent())) {
  790.             throw new IOException("Mkdirs failed to create jars directory "); 
  791.           }
  792.           systemFS.copyToLocalFile(jarFilePath, localJarFile);
  793.           localJobConf.setJar(localJarFile.toString());
  794.           OutputStream out = localFs.create(localJobFile);
  795.           try {
  796.             localJobConf.writeXml(out);
  797.           } finally {
  798.             out.close();
  799.           }
  800.           // also unjar the job.jar files 
  801.           RunJar.unJar(new File(localJarFile.toString()),
  802.                        new File(localJarFile.getParent().toString()));
  803.         }
  804.         rjob.keepJobFiles = ((localJobConf.getKeepTaskFilesPattern() != null) ||
  805.                              localJobConf.getKeepFailedTaskFiles());
  806.         rjob.localized = true;
  807.         rjob.jobConf = localJobConf;
  808.       }
  809.     }
  810.     launchTaskForJob(tip, new JobConf(rjob.jobConf)); 
  811.   }
  812.   private void launchTaskForJob(TaskInProgress tip, JobConf jobConf) throws IOException{
  813.     synchronized (tip) {
  814.       tip.setJobConf(jobConf);
  815.       tip.launchTask();
  816.     }
  817.   }
  818.     
  819.   public synchronized void shutdown() throws IOException {
  820.     shuttingDown = true;
  821.     close();
  822.     if (this.server != null) {
  823.       try {
  824.         LOG.info("Shutting down StatusHttpServer");
  825.         this.server.stop();
  826.       } catch (Exception e) {
  827.         LOG.warn("Exception shutting down TaskTracker", e);
  828.       }
  829.     }
  830.   }
  831.   /**
  832.    * Close down the TaskTracker and all its components.  We must also shutdown
  833.    * any running tasks or threads, and cleanup disk space.  A new TaskTracker
  834.    * within the same process space might be restarted, so everything must be
  835.    * clean.
  836.    */
  837.   public synchronized void close() throws IOException {
  838.     //
  839.     // Kill running tasks.  Do this in a 2nd vector, called 'tasksToClose',
  840.     // because calling jobHasFinished() may result in an edit to 'tasks'.
  841.     //
  842.     TreeMap<TaskAttemptID, TaskInProgress> tasksToClose =
  843.       new TreeMap<TaskAttemptID, TaskInProgress>();
  844.     tasksToClose.putAll(tasks);
  845.     for (TaskInProgress tip : tasksToClose.values()) {
  846.       tip.jobHasFinished(false);
  847.     }
  848.     
  849.     this.running = false;
  850.         
  851.     // Clear local storage
  852.     cleanupStorage();
  853.         
  854.     // Shutdown the fetcher thread
  855.     this.mapEventsFetcher.interrupt();
  856.     
  857.     //stop the launchers
  858.     this.mapLauncher.interrupt();
  859.     this.reduceLauncher.interrupt();
  860.     
  861.     jvmManager.stop();
  862.     
  863.     // shutdown RPC connections
  864.     RPC.stopProxy(jobClient);
  865.     // wait for the fetcher thread to exit
  866.     for (boolean done = false; !done; ) {
  867.       try {
  868.         this.mapEventsFetcher.join();
  869.         done = true;
  870.       } catch (InterruptedException e) {
  871.       }
  872.     }
  873.     
  874.     if (taskReportServer != null) {
  875.       taskReportServer.stop();
  876.       taskReportServer = null;
  877.     }
  878.   }
  879.   /**
  880.    * Start with the local machine name, and the default JobTracker
  881.    */
  882.   public TaskTracker(JobConf conf) throws IOException {
  883.     originalConf = conf;
  884.     maxCurrentMapTasks = conf.getInt(
  885.                   "mapred.tasktracker.map.tasks.maximum", 2);
  886.     maxCurrentReduceTasks = conf.getInt(
  887.                   "mapred.tasktracker.reduce.tasks.maximum", 2);
  888.     this.jobTrackAddr = JobTracker.getAddress(conf);
  889.     String infoAddr = 
  890.       NetUtils.getServerAddress(conf,
  891.                                 "tasktracker.http.bindAddress", 
  892.                                 "tasktracker.http.port",
  893.                                 "mapred.task.tracker.http.address");
  894.     InetSocketAddress infoSocAddr = NetUtils.createSocketAddr(infoAddr);
  895.     String httpBindAddress = infoSocAddr.getHostName();
  896.     int httpPort = infoSocAddr.getPort();
  897.     this.server = new HttpServer("task", httpBindAddress, httpPort,
  898.         httpPort == 0, conf);
  899.     workerThreads = conf.getInt("tasktracker.http.threads", 40);
  900.     this.shuffleServerMetrics = new ShuffleServerMetrics(conf);
  901.     server.setThreads(1, workerThreads);
  902.     // let the jsp pages get to the task tracker, config, and other relevant
  903.     // objects
  904.     FileSystem local = FileSystem.getLocal(conf);
  905.     this.localDirAllocator = new LocalDirAllocator("mapred.local.dir");
  906.     server.setAttribute("task.tracker", this);
  907.     server.setAttribute("local.file.system", local);
  908.     server.setAttribute("conf", conf);
  909.     server.setAttribute("log", LOG);
  910.     server.setAttribute("localDirAllocator", localDirAllocator);
  911.     server.setAttribute("shuffleServerMetrics", shuffleServerMetrics);
  912.     server.addInternalServlet("mapOutput", "/mapOutput", MapOutputServlet.class);
  913.     server.addInternalServlet("taskLog", "/tasklog", TaskLogServlet.class);
  914.     server.start();
  915.     this.httpPort = server.getPort();
  916.     initialize();
  917.   }
  918.   private void startCleanupThreads() throws IOException {
  919.     taskCleanupThread.setDaemon(true);
  920.     taskCleanupThread.start();
  921.     directoryCleanupThread = new CleanupQueue();
  922.   }
  923.   
  924.   /**
  925.    * The connection to the JobTracker, used by the TaskRunner 
  926.    * for locating remote files.
  927.    */
  928.   public InterTrackerProtocol getJobClient() {
  929.     return jobClient;
  930.   }
  931.         
  932.   /** Return the port at which the tasktracker bound to */
  933.   public synchronized InetSocketAddress getTaskTrackerReportAddress() {
  934.     return taskReportAddress;
  935.   }
  936.     
  937.   /** Queries the job tracker for a set of outputs ready to be copied
  938.    * @param fromEventId the first event ID we want to start from, this is
  939.    * modified by the call to this method
  940.    * @param jobClient the job tracker
  941.    * @return a set of locations to copy outputs from
  942.    * @throws IOException
  943.    */  
  944.   private List<TaskCompletionEvent> queryJobTracker(IntWritable fromEventId,
  945.                                                     JobID jobId,
  946.                                                     InterTrackerProtocol jobClient)
  947.     throws IOException {
  948.     TaskCompletionEvent t[] = jobClient.getTaskCompletionEvents(
  949.                                                                 jobId,
  950.                                                                 fromEventId.get(),
  951.                                                                 probe_sample_size);
  952.     //we are interested in map task completion events only. So store
  953.     //only those
  954.     List <TaskCompletionEvent> recentMapEvents = 
  955.       new ArrayList<TaskCompletionEvent>();
  956.     for (int i = 0; i < t.length; i++) {
  957.       if (t[i].isMap) {
  958.         recentMapEvents.add(t[i]);
  959.       }
  960.     }
  961.     fromEventId.set(fromEventId.get() + t.length);
  962.     return recentMapEvents;
  963.   }
  964.   /**
  965.    * Main service loop.  Will stay in this loop forever.
  966.    */
  967.   State offerService() throws Exception {
  968.     long lastHeartbeat = 0;
  969.     while (running && !shuttingDown) {
  970.       try {
  971.         long now = System.currentTimeMillis();
  972.         long waitTime = heartbeatInterval - (now - lastHeartbeat);
  973.         if (waitTime > 0) {
  974.           // sleeps for the wait time
  975.           Thread.sleep(waitTime);
  976.         }
  977.         // If the TaskTracker is just starting up:
  978.         // 1. Verify the buildVersion
  979.         // 2. Get the system directory & filesystem
  980.         if(justInited) {
  981.           String jobTrackerBV = jobClient.getBuildVersion();
  982.           if(!VersionInfo.getBuildVersion().equals(jobTrackerBV)) {
  983.             String msg = "Shutting down. Incompatible buildVersion." +
  984.             "nJobTracker's: " + jobTrackerBV + 
  985.             "nTaskTracker's: "+ VersionInfo.getBuildVersion();
  986.             LOG.error(msg);
  987.             try {
  988.               jobClient.reportTaskTrackerError(taskTrackerName, null, msg);
  989.             } catch(Exception e ) {
  990.               LOG.info("Problem reporting to jobtracker: " + e);
  991.             }
  992.             return State.DENIED;
  993.           }
  994.           
  995.           String dir = jobClient.getSystemDir();
  996.           if (dir == null) {
  997.             throw new IOException("Failed to get system directory");
  998.           }
  999.           systemDirectory = new Path(dir);
  1000.           systemFS = systemDirectory.getFileSystem(fConf);
  1001.         }
  1002.         
  1003.         // Send the heartbeat and process the jobtracker's directives
  1004.         HeartbeatResponse heartbeatResponse = transmitHeartBeat(now);
  1005.         // Note the time when the heartbeat returned, use this to decide when to send the
  1006.         // next heartbeat   
  1007.         lastHeartbeat = System.currentTimeMillis();
  1008.         
  1009.         
  1010.         // Check if the map-event list needs purging
  1011.         Set<JobID> jobs = heartbeatResponse.getRecoveredJobs();
  1012.         if (jobs.size() > 0) {
  1013.           synchronized (this) {
  1014.             // purge the local map events list
  1015.             for (JobID job : jobs) {
  1016.               RunningJob rjob;
  1017.               synchronized (runningJobs) {
  1018.                 rjob = runningJobs.get(job);          
  1019.                 if (rjob != null) {
  1020.                   synchronized (rjob) {
  1021.                     FetchStatus f = rjob.getFetchStatus();
  1022.                     if (f != null) {
  1023.                       f.reset();
  1024.                     }
  1025.                   }
  1026.                 }
  1027.               }
  1028.             }
  1029.             // Mark the reducers in shuffle for rollback
  1030.             synchronized (shouldReset) {
  1031.               for (Map.Entry<TaskAttemptID, TaskInProgress> entry 
  1032.                    : runningTasks.entrySet()) {
  1033.                 if (entry.getValue().getStatus().getPhase() == Phase.SHUFFLE) {
  1034.                   this.shouldReset.add(entry.getKey());
  1035.                 }
  1036.               }
  1037.             }
  1038.           }
  1039.         }
  1040.         
  1041.         TaskTrackerAction[] actions = heartbeatResponse.getActions();
  1042.         if(LOG.isDebugEnabled()) {
  1043.           LOG.debug("Got heartbeatResponse from JobTracker with responseId: " + 
  1044.                     heartbeatResponse.getResponseId() + " and " + 
  1045.                     ((actions != null) ? actions.length : 0) + " actions");
  1046.         }
  1047.         if (reinitTaskTracker(actions)) {
  1048.           return State.STALE;
  1049.         }
  1050.             
  1051.         // resetting heartbeat interval from the response.
  1052.         heartbeatInterval = heartbeatResponse.getHeartbeatInterval();
  1053.         justStarted = false;
  1054.         justInited = false;
  1055.         if (actions != null){ 
  1056.           for(TaskTrackerAction action: actions) {
  1057.             if (action instanceof LaunchTaskAction) {
  1058.               addToTaskQueue((LaunchTaskAction)action);
  1059.             } else if (action instanceof CommitTaskAction) {
  1060.               CommitTaskAction commitAction = (CommitTaskAction)action;
  1061.               if (!commitResponses.contains(commitAction.getTaskID())) {
  1062.                 LOG.info("Received commit task action for " + 
  1063.                           commitAction.getTaskID());
  1064.                 commitResponses.add(commitAction.getTaskID());
  1065.               }
  1066.             } else {
  1067.               tasksToCleanup.put(action);
  1068.             }
  1069.           }
  1070.         }
  1071.         markUnresponsiveTasks();
  1072.         killOverflowingTasks();
  1073.             
  1074.         //we've cleaned up, resume normal operation
  1075.         if (!acceptNewTasks && isIdle()) {
  1076.           acceptNewTasks=true;
  1077.         }
  1078.       } catch (InterruptedException ie) {
  1079.         LOG.info("Interrupted. Closing down.");
  1080.         return State.INTERRUPTED;
  1081.       } catch (DiskErrorException de) {
  1082.         String msg = "Exiting task tracker for disk error:n" +
  1083.           StringUtils.stringifyException(de);
  1084.         LOG.error(msg);
  1085.         synchronized (this) {
  1086.           jobClient.reportTaskTrackerError(taskTrackerName, 
  1087.                                            "DiskErrorException", msg);
  1088.         }
  1089.         return State.STALE;
  1090.       } catch (RemoteException re) {
  1091.         String reClass = re.getClassName();
  1092.         if (DisallowedTaskTrackerException.class.getName().equals(reClass)) {
  1093.           LOG.info("Tasktracker disallowed by JobTracker.");
  1094.           return State.DENIED;
  1095.         }
  1096.       } catch (Exception except) {
  1097.         String msg = "Caught exception: " + 
  1098.           StringUtils.stringifyException(except);
  1099.         LOG.error(msg);
  1100.       }
  1101.     }
  1102.     return State.NORMAL;
  1103.   }
  1104.   private long previousUpdate = 0;
  1105.   /**
  1106.    * Build and transmit the heart beat to the JobTracker
  1107.    * @param now current time
  1108.    * @return false if the tracker was unknown
  1109.    * @throws IOException
  1110.    */
  1111.   private HeartbeatResponse transmitHeartBeat(long now) throws IOException {
  1112.     // Send Counters in the status once every COUNTER_UPDATE_INTERVAL
  1113.     boolean sendCounters;
  1114.     if (now > (previousUpdate + COUNTER_UPDATE_INTERVAL)) {
  1115.       sendCounters = true;
  1116.       previousUpdate = now;
  1117.     }
  1118.     else {
  1119.       sendCounters = false;
  1120.     }
  1121.     // 
  1122.     // Check if the last heartbeat got through... 
  1123.     // if so then build the heartbeat information for the JobTracker;
  1124.     // else resend the previous status information.
  1125.     //
  1126.     if (status == null) {
  1127.       synchronized (this) {
  1128.         status = new TaskTrackerStatus(taskTrackerName, localHostname, 
  1129.                                        httpPort, 
  1130.                                        cloneAndResetRunningTaskStatuses(
  1131.                                          sendCounters), 
  1132.                                        failures, 
  1133.                                        maxCurrentMapTasks,
  1134.                                        maxCurrentReduceTasks); 
  1135.       }
  1136.     } else {
  1137.       LOG.info("Resending 'status' to '" + jobTrackAddr.getHostName() +
  1138.                "' with reponseId '" + heartbeatResponseId);
  1139.     }
  1140.       
  1141.     //
  1142.     // Check if we should ask for a new Task
  1143.     //
  1144.     boolean askForNewTask;
  1145.     long localMinSpaceStart;
  1146.     synchronized (this) {
  1147.       askForNewTask = (status.countMapTasks() < maxCurrentMapTasks || 
  1148.                        status.countReduceTasks() < maxCurrentReduceTasks) &&
  1149.                       acceptNewTasks; 
  1150.       localMinSpaceStart = minSpaceStart;
  1151.     }
  1152.     if (askForNewTask) {
  1153.       checkLocalDirs(fConf.getLocalDirs());
  1154.       askForNewTask = enoughFreeSpace(localMinSpaceStart);
  1155.       long freeDiskSpace = getFreeSpace();
  1156.       long totVmem = getTotalVirtualMemoryOnTT();
  1157.       long totPmem = getTotalPhysicalMemoryOnTT();
  1158.       long rsrvdVmem = getReservedVirtualMemory();
  1159.       long rsrvdPmem = getReservedPhysicalMemory();
  1160.       status.getResourceStatus().setAvailableSpace(freeDiskSpace);
  1161.       status.getResourceStatus().setTotalVirtualMemory(totVmem);
  1162.       status.getResourceStatus().setTotalPhysicalMemory(totPmem);
  1163.       status.getResourceStatus().setReservedVirtualMemory(rsrvdVmem);
  1164.       status.getResourceStatus().setReservedPhysicalMemory(rsrvdPmem);
  1165.     }
  1166.       
  1167.     //
  1168.     // Xmit the heartbeat
  1169.     //
  1170.     HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status, 
  1171.                                                               justStarted,
  1172.                                                               justInited,
  1173.                                                               askForNewTask, 
  1174.                                                               heartbeatResponseId);
  1175.       
  1176.     //
  1177.     // The heartbeat got through successfully!
  1178.     //
  1179.     heartbeatResponseId = heartbeatResponse.getResponseId();
  1180.       
  1181.     synchronized (this) {
  1182.       for (TaskStatus taskStatus : status.getTaskReports()) {
  1183.         if (taskStatus.getRunState() != TaskStatus.State.RUNNING &&
  1184.             taskStatus.getRunState() != TaskStatus.State.UNASSIGNED &&
  1185.             taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING &&
  1186.             !taskStatus.inTaskCleanupPhase()) {
  1187.           if (taskStatus.getIsMap()) {
  1188.             mapTotal--;
  1189.           } else {
  1190.             reduceTotal--;
  1191.           }
  1192.           try {
  1193.             myInstrumentation.completeTask(taskStatus.getTaskID());
  1194.           } catch (MetricsException me) {
  1195.             LOG.warn("Caught: " + StringUtils.stringifyException(me));
  1196.           }
  1197.           runningTasks.remove(taskStatus.getTaskID());
  1198.         }
  1199.       }
  1200.       
  1201.       // Clear transient status information which should only
  1202.       // be sent once to the JobTracker
  1203.       for (TaskInProgress tip: runningTasks.values()) {
  1204.         tip.getStatus().clearStatus();
  1205.       }
  1206.     }
  1207.     // Force a rebuild of 'status' on the next iteration
  1208.     status = null;                                
  1209.     return heartbeatResponse;
  1210.   }
  1211.   /**
  1212.    * Return the total virtual memory available on this TaskTracker.
  1213.    * @return total size of virtual memory.
  1214.    */
  1215.   long getTotalVirtualMemoryOnTT() {
  1216.     return totalVirtualMemoryOnTT;
  1217.   }
  1218.   /**
  1219.    * Return the total physical memory available on this TaskTracker.
  1220.    * @return total size of physical memory.
  1221.    */
  1222.   long getTotalPhysicalMemoryOnTT() {
  1223.     return totalPmemOnTT;
  1224.   }
  1225.   /**
  1226.    * Return the amount of virtual memory reserved on the TaskTracker for system
  1227.    * usage (OS, TT etc).
  1228.    */
  1229.   long getReservedVirtualMemory() {
  1230.     return reservedVirtualMemory;
  1231.   }
  1232.   /**
  1233.    * Return the amount of physical memory reserved on the TaskTracker for system
  1234.    * usage (OS, TT etc).
  1235.    */
  1236.   long getReservedPhysicalMemory() {
  1237.     return reservedPmem;
  1238.   }
  1239.   /**
  1240.    * Return the limit on the maxVMemPerTask on this TaskTracker
  1241.    * @return limitMaxVmPerTask
  1242.    */
  1243.   long getLimitMaxVMemPerTask() {
  1244.     return limitMaxVmPerTask;
  1245.   }
  1246.   /**
  1247.    * Obtain the virtual memory allocated for a TIP.
  1248.    * 
  1249.    * If the TIP's job has a configured value for the max-virtual memory, that
  1250.    * will be returned. Else, the cluster-wide default maxvirtual memory for
  1251.    * tasks is returned.
  1252.    * 
  1253.    * @param conf
  1254.    * @return the virtual memory allocated for the TIP.
  1255.    */
  1256.   long getVirtualMemoryForTask(JobConf conf) {
  1257.     long vMemForTask =
  1258.         normalizeMemoryConfigValue(conf.getMaxVirtualMemoryForTask());
  1259.     if (vMemForTask == JobConf.DISABLED_MEMORY_LIMIT) {
  1260.       vMemForTask =
  1261.           normalizeMemoryConfigValue(fConf.getLong(
  1262.               JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
  1263.               JobConf.DISABLED_MEMORY_LIMIT));
  1264.     }
  1265.     return vMemForTask;
  1266.   }
  1267.   /**
  1268.    * Check if the jobtracker directed a 'reset' of the tasktracker.
  1269.    * 
  1270.    * @param actions the directives of the jobtracker for the tasktracker.
  1271.    * @return <code>true</code> if tasktracker is to be reset, 
  1272.    *         <code>false</code> otherwise.
  1273.    */
  1274.   private boolean reinitTaskTracker(TaskTrackerAction[] actions) {
  1275.     if (actions != null) {
  1276.       for (TaskTrackerAction action : actions) {
  1277.         if (action.getActionId() == 
  1278.             TaskTrackerAction.ActionType.REINIT_TRACKER) {
  1279.           LOG.info("Recieved RenitTrackerAction from JobTracker");
  1280.           return true;
  1281.         }
  1282.       }
  1283.     }
  1284.     return false;
  1285.   }
  1286.     
  1287.   /**
  1288.    * Kill any tasks that have not reported progress in the last X seconds.
  1289.    */
  1290.   private synchronized void markUnresponsiveTasks() throws IOException {
  1291.     long now = System.currentTimeMillis();
  1292.     for (TaskInProgress tip: runningTasks.values()) {
  1293.       if (tip.getRunState() == TaskStatus.State.RUNNING ||
  1294.           tip.getRunState() == TaskStatus.State.COMMIT_PENDING ||
  1295.           tip.isCleaningup()) {
  1296.         // Check the per-job timeout interval for tasks;
  1297.         // an interval of '0' implies it is never timed-out
  1298.         long jobTaskTimeout = tip.getTaskTimeout();
  1299.         if (jobTaskTimeout == 0) {
  1300.           continue;
  1301.         }
  1302.           
  1303.         // Check if the task has not reported progress for a 
  1304.         // time-period greater than the configured time-out
  1305.         long timeSinceLastReport = now - tip.getLastProgressReport();
  1306.         if (timeSinceLastReport > jobTaskTimeout && !tip.wasKilled) {
  1307.           String msg = 
  1308.             "Task " + tip.getTask().getTaskID() + " failed to report status for " 
  1309.             + (timeSinceLastReport / 1000) + " seconds. Killing!";
  1310.           LOG.info(tip.getTask().getTaskID() + ": " + msg);
  1311.           ReflectionUtils.logThreadInfo(LOG, "lost task", 30);
  1312.           tip.reportDiagnosticInfo(msg);
  1313.           myInstrumentation.timedoutTask(tip.getTask().getTaskID());
  1314.           purgeTask(tip, true);
  1315.         }
  1316.       }
  1317.     }
  1318.   }
  1319.   /**
  1320.    * The task tracker is done with this job, so we need to clean up.
  1321.    * @param action The action with the job
  1322.    * @throws IOException
  1323.    */
  1324.   private synchronized void purgeJob(KillJobAction action) throws IOException {
  1325.     JobID jobId = action.getJobID();
  1326.     LOG.info("Received 'KillJobAction' for job: " + jobId);
  1327.     RunningJob rjob = null;
  1328.     synchronized (runningJobs) {
  1329.       rjob = runningJobs.get(jobId);
  1330.     }
  1331.       
  1332.     if (rjob == null) {
  1333.       LOG.warn("Unknown job " + jobId + " being deleted.");
  1334.     } else {
  1335.       synchronized (rjob) {            
  1336.         // Add this tips of this job to queue of tasks to be purged 
  1337.         for (TaskInProgress tip : rjob.tasks) {
  1338.           tip.jobHasFinished(false);
  1339.           Task t = tip.getTask();
  1340.           if (t.isMapTask()) {
  1341.             indexCache.removeMap(tip.getTask().getTaskID().toString());
  1342.           }
  1343.         }
  1344.         // Delete the job directory for this  
  1345.         // task if the job is done/failed
  1346.         if (!rjob.keepJobFiles){
  1347.           directoryCleanupThread.addToQueue(fConf, getLocalFiles(fConf, 
  1348.             getLocalJobDir(rjob.getJobID().toString())));
  1349.         }
  1350.         // Remove this job 
  1351.         rjob.tasks.clear();
  1352.       }
  1353.     }
  1354.     synchronized(runningJobs) {
  1355.       runningJobs.remove(jobId);
  1356.     }
  1357.   }      
  1358.     
  1359.     
  1360.   /**
  1361.    * Remove the tip and update all relevant state.
  1362.    * 
  1363.    * @param tip {@link TaskInProgress} to be removed.
  1364.    * @param wasFailure did the task fail or was it killed?
  1365.    */
  1366.   private void purgeTask(TaskInProgress tip, boolean wasFailure) 
  1367.   throws IOException {
  1368.     if (tip != null) {
  1369.       LOG.info("About to purge task: " + tip.getTask().getTaskID());
  1370.         
  1371.       // Remove the task from running jobs, 
  1372.       // removing the job if it's the last task
  1373.       removeTaskFromJob(tip.getTask().getJobID(), tip);
  1374.       tip.jobHasFinished(wasFailure);
  1375.       if (tip.getTask().isMapTask()) {
  1376.         indexCache.removeMap(tip.getTask().getTaskID().toString());
  1377.       }
  1378.     }
  1379.   }
  1380.   /** Check if we're dangerously low on disk space
  1381.    * If so, kill jobs to free up space and make sure
  1382.    * we don't accept any new tasks
  1383.    * Try killing the reduce jobs first, since I believe they
  1384.    * use up most space
  1385.    * Then pick the one with least progress
  1386.    */
  1387.   private void killOverflowingTasks() throws IOException {
  1388.     long localMinSpaceKill;
  1389.     synchronized(this){
  1390.       localMinSpaceKill = minSpaceKill;  
  1391.     }
  1392.     if (!enoughFreeSpace(localMinSpaceKill)) {
  1393.       acceptNewTasks=false; 
  1394.       //we give up! do not accept new tasks until
  1395.       //all the ones running have finished and they're all cleared up
  1396.       synchronized (this) {
  1397.         TaskInProgress killMe = findTaskToKill(null);
  1398.         if (killMe!=null) {
  1399.           String msg = "Tasktracker running out of space." +
  1400.             " Killing task.";
  1401.           LOG.info(killMe.getTask().getTaskID() + ": " + msg);
  1402.           killMe.reportDiagnosticInfo(msg);
  1403.           purgeTask(killMe, false);
  1404.         }
  1405.       }
  1406.     }
  1407.   }
  1408.   /**
  1409.    * Pick a task to kill to free up memory/disk-space 
  1410.    * @param tasksToExclude tasks that are to be excluded while trying to find a
  1411.    *          task to kill. If null, all runningTasks will be searched.
  1412.    * @return the task to kill or null, if one wasn't found
  1413.    */
  1414.   synchronized TaskInProgress findTaskToKill(List<TaskAttemptID> tasksToExclude) {
  1415.     TaskInProgress killMe = null;
  1416.     for (Iterator it = runningTasks.values().iterator(); it.hasNext();) {
  1417.       TaskInProgress tip = (TaskInProgress) it.next();
  1418.       if (tasksToExclude != null
  1419.           && tasksToExclude.contains(tip.getTask().getTaskID())) {
  1420.         // exclude this task
  1421.         continue;
  1422.       }
  1423.       if ((tip.getRunState() == TaskStatus.State.RUNNING ||
  1424.            tip.getRunState() == TaskStatus.State.COMMIT_PENDING) &&
  1425.           !tip.wasKilled) {
  1426.                 
  1427.         if (killMe == null) {
  1428.           killMe = tip;
  1429.         } else if (!tip.getTask().isMapTask()) {
  1430.           //reduce task, give priority
  1431.           if (killMe.getTask().isMapTask() || 
  1432.               (tip.getTask().getProgress().get() < 
  1433.                killMe.getTask().getProgress().get())) {
  1434.             killMe = tip;
  1435.           }
  1436.         } else if (killMe.getTask().isMapTask() &&
  1437.                    tip.getTask().getProgress().get() < 
  1438.                    killMe.getTask().getProgress().get()) {
  1439.           //map task, only add if the progress is lower
  1440.           killMe = tip;
  1441.         }
  1442.       }
  1443.     }
  1444.     return killMe;
  1445.   }
  1446.   /**
  1447.    * Check if any of the local directories has enough
  1448.    * free space  (more than minSpace)
  1449.    * 
  1450.    * If not, do not try to get a new task assigned 
  1451.    * @return
  1452.    * @throws IOException 
  1453.    */
  1454.   private boolean enoughFreeSpace(long minSpace) throws IOException {
  1455.     if (minSpace == 0) {
  1456.       return true;
  1457.     }
  1458.     return minSpace < getFreeSpace();
  1459.   }
  1460.   
  1461.   private long getFreeSpace() throws IOException {
  1462.     long biggestSeenSoFar = 0;
  1463.     String[] localDirs = fConf.getLocalDirs();
  1464.     for (int i = 0; i < localDirs.length; i++) {
  1465.       DF df = null;
  1466.       if (localDirsDf.containsKey(localDirs[i])) {
  1467.         df = localDirsDf.get(localDirs[i]);
  1468.       } else {
  1469.         df = new DF(new File(localDirs[i]), fConf);
  1470.         localDirsDf.put(localDirs[i], df);
  1471.       }
  1472.       long availOnThisVol = df.getAvailable();
  1473.       if (availOnThisVol > biggestSeenSoFar) {
  1474.         biggestSeenSoFar = availOnThisVol;
  1475.       }
  1476.     }
  1477.     
  1478.     //Should ultimately hold back the space we expect running tasks to use but 
  1479.     //that estimate isn't currently being passed down to the TaskTrackers    
  1480.     return biggestSeenSoFar;
  1481.   }
  1482.     
  1483.   /**
  1484.    * Try to get the size of output for this task.
  1485.    * Returns -1 if it can't be found.
  1486.    * @return
  1487.    */
  1488.   long tryToGetOutputSize(TaskAttemptID taskId, JobConf conf) {
  1489.     
  1490.     try{
  1491.       TaskInProgress tip;
  1492.       synchronized(this) {
  1493.         tip = tasks.get(taskId);
  1494.       }
  1495.       if(tip == null)
  1496.          return -1;
  1497.       
  1498.       if (!tip.getTask().isMapTask() || 
  1499.           tip.getRunState() != TaskStatus.State.SUCCEEDED) {
  1500.         return -1;
  1501.       }
  1502.       
  1503.       MapOutputFile mapOutputFile = new MapOutputFile();
  1504.       mapOutputFile.setJobId(taskId.getJobID());
  1505.       mapOutputFile.setConf(conf);
  1506.       
  1507.       Path tmp_output =  mapOutputFile.getOutputFile(taskId);
  1508.       if(tmp_output == null)
  1509.         return 0;
  1510.       FileSystem localFS = FileSystem.getLocal(conf);
  1511.       FileStatus stat = localFS.getFileStatus(tmp_output);
  1512.       if(stat == null)
  1513.         return 0;
  1514.       else
  1515.         return stat.getLen();
  1516.     } catch(IOException e) {
  1517.       LOG.info(e);
  1518.       return -1;
  1519.     }
  1520.   }
  1521.   private TaskLauncher mapLauncher;
  1522.   private TaskLauncher reduceLauncher;
  1523.       
  1524.   public JvmManager getJvmManagerInstance() {
  1525.     return jvmManager;
  1526.   }
  1527.   
  1528.   private void addToTaskQueue(LaunchTaskAction action) {
  1529.     if (action.getTask().isMapTask()) {
  1530.       mapLauncher.addToTaskQueue(action);
  1531.     } else {
  1532.       reduceLauncher.addToTaskQueue(action);
  1533.     }
  1534.   }
  1535.   
  1536.   private class TaskLauncher extends Thread {
  1537.     private IntWritable numFreeSlots;
  1538.     private final int maxSlots;
  1539.     private List<TaskInProgress> tasksToLaunch;
  1540.     public TaskLauncher(int numSlots) {
  1541.       this.maxSlots = numSlots;
  1542.       this.numFreeSlots = new IntWritable(numSlots);
  1543.       this.tasksToLaunch = new LinkedList<TaskInProgress>();
  1544.       setDaemon(true);
  1545.       setName("TaskLauncher for task");
  1546.     }
  1547.     public void addToTaskQueue(LaunchTaskAction action) {
  1548.       synchronized (tasksToLaunch) {
  1549.         TaskInProgress tip = registerTask(action, this);
  1550.         tasksToLaunch.add(tip);
  1551.         tasksToLaunch.notifyAll();
  1552.       }
  1553.     }
  1554.     
  1555.     public void cleanTaskQueue() {
  1556.       tasksToLaunch.clear();
  1557.     }
  1558.     
  1559.     public void addFreeSlot() {
  1560.       synchronized (numFreeSlots) {
  1561.         numFreeSlots.set(numFreeSlots.get() + 1);
  1562.         assert (numFreeSlots.get() <= maxSlots);
  1563.         LOG.info("addFreeSlot : current free slots : " + numFreeSlots.get());
  1564.         numFreeSlots.notifyAll();
  1565.       }
  1566.     }
  1567.     
  1568.     public void run() {
  1569.       while (!Thread.interrupted()) {
  1570.         try {
  1571.           TaskInProgress tip;
  1572.           synchronized (tasksToLaunch) {
  1573.             while (tasksToLaunch.isEmpty()) {
  1574.               tasksToLaunch.wait();
  1575.             }
  1576.             //get the TIP
  1577.             tip = tasksToLaunch.remove(0);
  1578.             LOG.info("Trying to launch : " + tip.getTask().getTaskID());
  1579.           }
  1580.           //wait for a slot to run
  1581.           synchronized (numFreeSlots) {
  1582.             while (numFreeSlots.get() == 0) {
  1583.               numFreeSlots.wait();
  1584.             }
  1585.             LOG.info("In TaskLauncher, current free slots : " + numFreeSlots.get()+
  1586.                 " and trying to launch "+tip.getTask().getTaskID());
  1587.             numFreeSlots.set(numFreeSlots.get() - 1);
  1588.             assert (numFreeSlots.get() >= 0);
  1589.           }
  1590.           synchronized (tip) {
  1591.             //to make sure that there is no kill task action for this
  1592.             if (tip.getRunState() != TaskStatus.State.UNASSIGNED &&
  1593.                 tip.getRunState() != TaskStatus.State.FAILED_UNCLEAN &&
  1594.                 tip.getRunState() != TaskStatus.State.KILLED_UNCLEAN) {
  1595.               //got killed externally while still in the launcher queue
  1596.               addFreeSlot();
  1597.               continue;
  1598.             }
  1599.             tip.slotTaken = true;
  1600.           }
  1601.           //got a free slot. launch the task
  1602.           startNewTask(tip);
  1603.         } catch (InterruptedException e) { 
  1604.           return; // ALL DONE
  1605.         } catch (Throwable th) {
  1606.           LOG.error("TaskLauncher error " + 
  1607.               StringUtils.stringifyException(th));
  1608.         }
  1609.       }
  1610.     }
  1611.   }
  1612.   private TaskInProgress registerTask(LaunchTaskAction action, 
  1613.       TaskLauncher launcher) {
  1614.     Task t = action.getTask();
  1615.     LOG.info("LaunchTaskAction (registerTask): " + t.getTaskID() +
  1616.              " task's state:" + t.getState());
  1617.     TaskInProgress tip = new TaskInProgress(t, this.fConf, launcher);
  1618.     synchronized (this) {
  1619.       tasks.put(t.getTaskID(), tip);
  1620.       runningTasks.put(t.getTaskID(), tip);
  1621.       boolean isMap = t.isMapTask();
  1622.       if (isMap) {
  1623.         mapTotal++;
  1624.       } else {
  1625.         reduceTotal++;
  1626.       }
  1627.     }
  1628.     return tip;
  1629.   }
  1630.   /**
  1631.    * Start a new task.
  1632.    * All exceptions are handled locally, so that we don't mess up the
  1633.    * task tracker.
  1634.    */
  1635.   private void startNewTask(TaskInProgress tip) {
  1636.     try {
  1637.       localizeJob(tip);
  1638.     } catch (Throwable e) {
  1639.       String msg = ("Error initializing " + tip.getTask().getTaskID() + 
  1640.                     ":n" + StringUtils.stringifyException(e));
  1641.       LOG.warn(msg);
  1642.       tip.reportDiagnosticInfo(msg);
  1643.       try {
  1644.         tip.kill(true);
  1645.         tip.cleanup(true);
  1646.       } catch (IOException ie2) {
  1647.         LOG.info("Error cleaning up " + tip.getTask().getTaskID() + ":n" +
  1648.                  StringUtils.stringifyException(ie2));          
  1649.       }
  1650.         
  1651.       // Careful! 
  1652.       // This might not be an 'Exception' - don't handle 'Error' here!
  1653.       if (e instanceof Error) {
  1654.         throw ((Error) e);
  1655.       }
  1656.     }
  1657.   }
  1658.   
  1659.   void addToMemoryManager(TaskAttemptID attemptId, 
  1660.                           JobConf conf, 
  1661.                           String pidFile) {
  1662.     if (isTaskMemoryManagerEnabled()) {
  1663.       taskMemoryManager.addTask(attemptId, 
  1664.         getVirtualMemoryForTask(conf), pidFile);
  1665.     }
  1666.   }
  1667.   void removeFromMemoryManager(TaskAttemptID attemptId) {
  1668.     // Remove the entry from taskMemoryManagerThread's data structures.
  1669.     if (isTaskMemoryManagerEnabled()) {
  1670.       taskMemoryManager.removeTask(attemptId);
  1671.     }
  1672.   }
  1673.   /**
  1674.    * The server retry loop.  
  1675.    * This while-loop attempts to connect to the JobTracker.  It only 
  1676.    * loops when the old TaskTracker has gone bad (its state is
  1677.    * stale somehow) and we need to reinitialize everything.
  1678.    */
  1679.   public void run() {
  1680.     try {
  1681.       startCleanupThreads();
  1682.       boolean denied = false;
  1683.       while (running && !shuttingDown && !denied) {
  1684.         boolean staleState = false;
  1685.         try {
  1686.           // This while-loop attempts reconnects if we get network errors
  1687.           while (running && !staleState && !shuttingDown && !denied) {
  1688.             try {
  1689.               State osState = offerService();
  1690.               if (osState == State.STALE) {
  1691.                 staleState = true;
  1692.               } else if (osState == State.DENIED) {
  1693.                 denied = true;
  1694.               }
  1695.             } catch (Exception ex) {
  1696.               if (!shuttingDown) {
  1697.                 LOG.info("Lost connection to JobTracker [" +
  1698.                          jobTrackAddr + "].  Retrying...", ex);
  1699.                 try {
  1700.                   Thread.sleep(5000);
  1701.                 } catch (InterruptedException ie) {
  1702.                 }
  1703.               }
  1704.             }
  1705.           }
  1706.         } finally {
  1707.           close();
  1708.         }
  1709.         if (shuttingDown) { return; }
  1710.         LOG.warn("Reinitializing local state");
  1711.         initialize();
  1712.       }
  1713.       if (denied) {
  1714.         shutdown();
  1715.       }
  1716.     } catch (IOException iex) {
  1717.       LOG.error("Got fatal exception while reinitializing TaskTracker: " +
  1718.                 StringUtils.stringifyException(iex));
  1719.       return;
  1720.     }
  1721.   }
  1722.     
  1723.   ///////////////////////////////////////////////////////
  1724.   // TaskInProgress maintains all the info for a Task that
  1725.   // lives at this TaskTracker.  It maintains the Task object,
  1726.   // its TaskStatus, and the TaskRunner.
  1727.   ///////////////////////////////////////////////////////
  1728.   class TaskInProgress {
  1729.     Task task;
  1730.     long lastProgressReport;
  1731.     StringBuffer diagnosticInfo = new StringBuffer();
  1732.     private TaskRunner runner;
  1733.     volatile boolean done = false;
  1734.     volatile boolean wasKilled = false;
  1735.     private JobConf defaultJobConf;
  1736.     private JobConf localJobConf;
  1737.     private boolean keepFailedTaskFiles;
  1738.     private boolean alwaysKeepTaskFiles;
  1739.     private TaskStatus taskStatus; 
  1740.     private long taskTimeout;
  1741.     private String debugCommand;
  1742.     private volatile boolean slotTaken = false;
  1743.     private TaskLauncher launcher;
  1744.         
  1745.     /**
  1746.      */
  1747.     public TaskInProgress(Task task, JobConf conf) {
  1748.       this(task, conf, null);
  1749.     }
  1750.     
  1751.     public TaskInProgress(Task task, JobConf conf, TaskLauncher launcher) {
  1752.       this.task = task;
  1753.       this.launcher = launcher;
  1754.       this.lastProgressReport = System.currentTimeMillis();
  1755.       this.defaultJobConf = conf;
  1756.       localJobConf = null;
  1757.       taskStatus = TaskStatus.createTaskStatus(task.isMapTask(), task.getTaskID(), 
  1758.                                                0.0f, 
  1759.                                                task.getState(),
  1760.                                                diagnosticInfo.toString(), 
  1761.                                                "initializing",  
  1762.                                                getName(), 
  1763.                                                task.isTaskCleanupTask() ? 
  1764.                                                  TaskStatus.Phase.CLEANUP :  
  1765.                                                task.isMapTask()? TaskStatus.Phase.MAP:
  1766.                                                TaskStatus.Phase.SHUFFLE,
  1767.                                                task.getCounters()); 
  1768.       taskTimeout = (10 * 60 * 1000);
  1769.     }
  1770.         
  1771.     private void localizeTask(Task task) throws IOException{
  1772.       Path localTaskDir = 
  1773.         lDirAlloc.getLocalPathForWrite(
  1774.           TaskTracker.getLocalTaskDir(task.getJobID().toString(), 
  1775.             task.getTaskID().toString(), task.isTaskCleanupTask()), 
  1776.           defaultJobConf );
  1777.       
  1778.       FileSystem localFs = FileSystem.getLocal(fConf);
  1779.       if (!localFs.mkdirs(localTaskDir)) {
  1780.         throw new IOException("Mkdirs failed to create " 
  1781.                     + localTaskDir.toString());
  1782.       }
  1783.       // create symlink for ../work if it already doesnt exist
  1784.       String workDir = lDirAlloc.getLocalPathToRead(
  1785.                          TaskTracker.getLocalJobDir(task.getJobID().toString())
  1786.                          + Path.SEPARATOR  
  1787.                          + "work", defaultJobConf).toString();
  1788.       String link = localTaskDir.getParent().toString() 
  1789.                       + Path.SEPARATOR + "work";
  1790.       File flink = new File(link);
  1791.       if (!flink.exists())
  1792.         FileUtil.symLink(workDir, link);
  1793.       
  1794.       // create the working-directory of the task 
  1795.       Path cwd = lDirAlloc.getLocalPathForWrite(
  1796.                    getLocalTaskDir(task.getJobID().toString(), 
  1797.                       task.getTaskID().toString(), task.isTaskCleanupTask()) 
  1798.                    + Path.SEPARATOR + MRConstants.WORKDIR,
  1799.                    defaultJobConf);
  1800.       if (!localFs.mkdirs(cwd)) {
  1801.         throw new IOException("Mkdirs failed to create " 
  1802.                     + cwd.toString());
  1803.       }
  1804.       Path localTaskFile = new Path(localTaskDir, "job.xml");
  1805.       task.setJobFile(localTaskFile.toString());
  1806.       localJobConf.set("mapred.local.dir",
  1807.                        fConf.get("mapred.local.dir"));
  1808.       if (fConf.get("slave.host.name") != null) {
  1809.         localJobConf.set("slave.host.name",
  1810.                          fConf.get("slave.host.name"));
  1811.       }
  1812.             
  1813.       localJobConf.set("mapred.task.id", task.getTaskID().toString());
  1814.       keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles();
  1815.       task.localizeConfiguration(localJobConf);
  1816.       
  1817.       List<String[]> staticResolutions = NetUtils.getAllStaticResolutions();
  1818.       if (staticResolutions != null && staticResolutions.size() > 0) {
  1819.         StringBuffer str = new StringBuffer();
  1820.         for (int i = 0; i < staticResolutions.size(); i++) {
  1821.           String[] hostToResolved = staticResolutions.get(i);
  1822.           str.append(hostToResolved[0]+"="+hostToResolved[1]);
  1823.           if (i != staticResolutions.size() - 1) {
  1824.             str.append(',');
  1825.           }
  1826.         }
  1827.         localJobConf.set("hadoop.net.static.resolutions", str.toString());
  1828.       }
  1829.       if (task.isMapTask()) {
  1830.         debugCommand = localJobConf.getMapDebugScript();
  1831.       } else {
  1832.         debugCommand = localJobConf.getReduceDebugScript();
  1833.       }
  1834.       String keepPattern = localJobConf.getKeepTaskFilesPattern();
  1835.       if (keepPattern != null) {
  1836.         alwaysKeepTaskFiles = 
  1837.           Pattern.matches(keepPattern, task.getTaskID().toString());
  1838.       } else {
  1839.         alwaysKeepTaskFiles = false;
  1840.       }
  1841.       if (debugCommand != null || localJobConf.getProfileEnabled() ||
  1842.           alwaysKeepTaskFiles || keepFailedTaskFiles) {
  1843.         //disable jvm reuse
  1844.         localJobConf.setNumTasksToExecutePerJvm(1);
  1845.       }
  1846.       if (isTaskMemoryManagerEnabled()) {
  1847.         localJobConf.setBoolean("task.memory.mgmt.enabled", true);
  1848.       }
  1849.       OutputStream out = localFs.create(localTaskFile);
  1850.       try {
  1851.         localJobConf.writeXml(out);
  1852.       } finally {
  1853.         out.close();
  1854.       }
  1855.       task.setConf(localJobConf);
  1856.     }
  1857.         
  1858.     /**
  1859.      */
  1860.     public Task getTask() {
  1861.       return task;
  1862.     }
  1863.     
  1864.     public TaskRunner getTaskRunner() {
  1865.       return runner;
  1866.     }
  1867.     public synchronized void setJobConf(JobConf lconf){
  1868.       this.localJobConf = lconf;
  1869.       keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles();
  1870.       taskTimeout = localJobConf.getLong("mapred.task.timeout", 
  1871.                                          10 * 60 * 1000);
  1872.     }
  1873.         
  1874.     public synchronized JobConf getJobConf() {
  1875.       return localJobConf;
  1876.     }
  1877.         
  1878.     /**
  1879.      */
  1880.     public synchronized TaskStatus getStatus() {
  1881.       taskStatus.setDiagnosticInfo(diagnosticInfo.toString());
  1882.       if (diagnosticInfo.length() > 0) {
  1883.         diagnosticInfo = new StringBuffer();
  1884.       }
  1885.       
  1886.       return taskStatus;
  1887.     }
  1888.     /**
  1889.      * Kick off the task execution
  1890.      */
  1891.     public synchronized void launchTask() throws IOException {
  1892.       if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED ||
  1893.           this.taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN ||
  1894.           this.taskStatus.getRunState() == TaskStatus.State.KILLED_UNCLEAN) {
  1895.         localizeTask(task);
  1896.         if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {
  1897.           this.taskStatus.setRunState(TaskStatus.State.RUNNING);
  1898.         }
  1899.         this.runner = task.createRunner(TaskTracker.this, this);
  1900.         this.runner.start();
  1901.         this.taskStatus.setStartTime(System.currentTimeMillis());
  1902.       } else {
  1903.         LOG.info("Not launching task: " + task.getTaskID() + 
  1904.             " since it's state is " + this.taskStatus.getRunState());
  1905.       }
  1906.     }
  1907.     boolean isCleaningup() {
  1908.       return this.taskStatus.inTaskCleanupPhase();
  1909.     }
  1910.     
  1911.     /**
  1912.      * The task is reporting its progress
  1913.      */
  1914.     public synchronized void reportProgress(TaskStatus taskStatus) 
  1915.     {
  1916.       LOG.info(task.getTaskID() + " " + taskStatus.getProgress() + 
  1917.           "% " + taskStatus.getStateString());
  1918.       // task will report its state as
  1919.       // COMMIT_PENDING when it is waiting for commit response and 
  1920.       // when it is committing.
  1921.       // cleanup attempt will report its state as FAILED_UNCLEAN/KILLED_UNCLEAN
  1922.       if (this.done || 
  1923.           (this.taskStatus.getRunState() != TaskStatus.State.RUNNING &&
  1924.           this.taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING &&
  1925.           !isCleaningup()) ||
  1926.           ((this.taskStatus.getRunState() == TaskStatus.State.COMMIT_PENDING ||
  1927.            this.taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN ||
  1928.            this.taskStatus.getRunState() == TaskStatus.State.KILLED_UNCLEAN) &&
  1929.            taskStatus.getRunState() == TaskStatus.State.RUNNING)) {
  1930.         //make sure we ignore progress messages after a task has 
  1931.         //invoked TaskUmbilicalProtocol.done() or if the task has been
  1932.         //KILLED/FAILED/FAILED_UNCLEAN/KILLED_UNCLEAN
  1933.         //Also ignore progress update if the state change is from 
  1934.         //COMMIT_PENDING/FAILED_UNCLEAN/KILLED_UNCLEA to RUNNING
  1935.         LOG.info(task.getTaskID() + " Ignoring status-update since " +
  1936.                  ((this.done) ? "task is 'done'" : 
  1937.                                 ("runState: " + this.taskStatus.getRunState()))
  1938.                  ); 
  1939.         return;
  1940.       }
  1941.       
  1942.       this.taskStatus.statusUpdate(taskStatus);
  1943.       this.lastProgressReport = System.currentTimeMillis();
  1944.     }
  1945.     /**
  1946.      */
  1947.     public long getLastProgressReport() {
  1948.       return lastProgressReport;
  1949.     }
  1950.     /**
  1951.      */
  1952.     public TaskStatus.State getRunState() {
  1953.       return taskStatus.getRunState();
  1954.     }
  1955.     /**
  1956.      * The task's configured timeout.
  1957.      * 
  1958.      * @return the task's configured timeout.
  1959.      */
  1960.     public long getTaskTimeout() {
  1961.       return taskTimeout;
  1962.     }
  1963.         
  1964.     /**
  1965.      * The task has reported some diagnostic info about its status
  1966.      */
  1967.     public synchronized void reportDiagnosticInfo(String info) {
  1968.       this.diagnosticInfo.append(info);
  1969.     }
  1970.     
  1971.     public synchronized void reportNextRecordRange(SortedRanges.Range range) {
  1972.       this.taskStatus.setNextRecordRange(range);
  1973.     }
  1974.     /**
  1975.      * The task is reporting that it's done running
  1976.      */
  1977.     public synchronized void reportDone() {
  1978.       if (isCleaningup()) {
  1979.         if (this.taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN) {
  1980.           this.taskStatus.setRunState(TaskStatus.State.FAILED);
  1981.         } else if (this.taskStatus.getRunState() == 
  1982.                    TaskStatus.State.KILLED_UNCLEAN) {
  1983.           this.taskStatus.setRunState(TaskStatus.State.KILLED);
  1984.         }
  1985.       } else {
  1986.         this.taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
  1987.       }
  1988.       this.taskStatus.setProgress(1.0f);
  1989.       this.taskStatus.setFinishTime(System.currentTimeMillis());
  1990.       this.done = true;
  1991.       jvmManager.taskFinished(runner);
  1992.       runner.signalDone();
  1993.       LOG.info("Task " + task.getTaskID() + " is done.");
  1994.       LOG.info("reported output size for " + task.getTaskID() +  "  was " + taskStatus.getOutputSize());
  1995.     }
  1996.     
  1997.     public boolean wasKilled() {
  1998.       return wasKilled;
  1999.     }
  2000.     void reportTaskFinished() {
  2001.       taskFinished();
  2002.       releaseSlot();
  2003.     }
  2004.     /* State changes:
  2005.      * RUNNING/COMMIT_PENDING -> FAILED_UNCLEAN/FAILED/KILLED_UNCLEAN/KILLED
  2006.      * FAILED_UNCLEAN -> FAILED
  2007.      * KILLED_UNCLEAN -> KILLED 
  2008.      */
  2009.     private void setTaskFailState(boolean wasFailure) {
  2010.       // go FAILED_UNCLEAN -> FAILED and KILLED_UNCLEAN -> KILLED always
  2011.       if (taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN) {
  2012.         taskStatus.setRunState(TaskStatus.State.FAILED);
  2013.       } else if (taskStatus.getRunState() == 
  2014.                  TaskStatus.State.KILLED_UNCLEAN) {
  2015.         taskStatus.setRunState(TaskStatus.State.KILLED);
  2016.       } else if (task.isMapOrReduce() && 
  2017.                  taskStatus.getPhase() != TaskStatus.Phase.CLEANUP) {
  2018.         if (wasFailure) {
  2019.           taskStatus.setRunState(TaskStatus.State.FAILED_UNCLEAN);
  2020.         } else {
  2021.           taskStatus.setRunState(TaskStatus.State.KILLED_UNCLEAN);
  2022.         }
  2023.       } else {
  2024.         if (wasFailure) {
  2025.           taskStatus.setRunState(TaskStatus.State.FAILED);
  2026.         } else {
  2027.           taskStatus.setRunState(TaskStatus.State.KILLED);
  2028.         }
  2029.       }
  2030.     }
  2031.     
  2032.     /**
  2033.      * The task has actually finished running.
  2034.      */
  2035.     public void taskFinished() {
  2036.       long start = System.currentTimeMillis();
  2037.       //
  2038.       // Wait until task reports as done.  If it hasn't reported in,
  2039.       // wait for a second and try again.
  2040.       //
  2041.       while (!done && (System.currentTimeMillis() - start < WAIT_FOR_DONE)) {
  2042.         try {
  2043.           Thread.sleep(1000);
  2044.         } catch (InterruptedException ie) {
  2045.         }
  2046.       }
  2047.       //
  2048.       // Change state to success or failure, depending on whether
  2049.       // task was 'done' before terminating
  2050.       //
  2051.       boolean needCleanup = false;
  2052.       synchronized (this) {
  2053.         // Remove the task from MemoryManager, if the task SUCCEEDED or FAILED.
  2054.         // KILLED tasks are removed in method kill(), because Kill 
  2055.         // would result in launching a cleanup attempt before 
  2056.         // TaskRunner returns; if remove happens here, it would remove
  2057.         // wrong task from memory manager.
  2058.         if (done || !wasKilled) {
  2059.           removeFromMemoryManager(task.getTaskID());
  2060.         }
  2061.         if (!done) {
  2062.           if (!wasKilled) {
  2063.             failures += 1;
  2064.             setTaskFailState(true);
  2065.             // call the script here for the failed tasks.
  2066.             if (debugCommand != null) {
  2067.               String taskStdout ="";
  2068.               String taskStderr ="";
  2069.               String taskSyslog ="";
  2070.               String jobConf = task.getJobFile();
  2071.               try {
  2072.                 // get task's stdout file 
  2073.                 taskStdout = FileUtil.makeShellPath(
  2074.                     TaskLog.getRealTaskLogFileLocation
  2075.                                   (task.getTaskID(), TaskLog.LogName.STDOUT));
  2076.                 // get task's stderr file 
  2077.                 taskStderr = FileUtil.makeShellPath(
  2078.                     TaskLog.getRealTaskLogFileLocation
  2079.                                   (task.getTaskID(), TaskLog.LogName.STDERR));
  2080.                 // get task's syslog file 
  2081.                 taskSyslog = FileUtil.makeShellPath(
  2082.                     TaskLog.getRealTaskLogFileLocation
  2083.                                   (task.getTaskID(), TaskLog.LogName.SYSLOG));
  2084.               } catch(IOException e){
  2085.                 LOG.warn("Exception finding task's stdout/err/syslog files");
  2086.               }
  2087.               File workDir = null;
  2088.               try {
  2089.                 workDir = new File(lDirAlloc.getLocalPathToRead(
  2090.                                      TaskTracker.getLocalTaskDir( 
  2091.                                        task.getJobID().toString(), 
  2092.                                        task.getTaskID().toString(),
  2093.                                        task.isTaskCleanupTask())
  2094.                                      + Path.SEPARATOR + MRConstants.WORKDIR,
  2095.                                      localJobConf). toString());
  2096.               } catch (IOException e) {
  2097.                 LOG.warn("Working Directory of the task " + task.getTaskID() +
  2098.                   "doesnt exist. Caught exception " +
  2099.                           StringUtils.stringifyException(e));
  2100.               }
  2101.               // Build the command  
  2102.               File stdout = TaskLog.getRealTaskLogFileLocation(
  2103.                                    task.getTaskID(), TaskLog.LogName.DEBUGOUT);
  2104.               // add pipes program as argument if it exists.
  2105.               String program ="";
  2106.               String executable = Submitter.getExecutable(localJobConf);
  2107.               if ( executable != null) {
  2108.              try {
  2109.                program = new URI(executable).getFragment();
  2110.              } catch (URISyntaxException ur) {
  2111.                LOG.warn("Problem in the URI fragment for pipes executable");
  2112.              }   
  2113.               }
  2114.               String [] debug = debugCommand.split(" ");
  2115.               Vector<String> vargs = new Vector<String>();
  2116.               for (String component : debug) {
  2117.                 vargs.add(component);
  2118.               }
  2119.               vargs.add(taskStdout);
  2120.               vargs.add(taskStderr);
  2121.               vargs.add(taskSyslog);
  2122.               vargs.add(jobConf);
  2123.               vargs.add(program);
  2124.               try {
  2125.                 List<String>  wrappedCommand = TaskLog.captureDebugOut
  2126.                                                           (vargs, stdout);
  2127.                 // run the script.
  2128.                 try {
  2129.                   runScript(wrappedCommand, workDir);
  2130.                 } catch (IOException ioe) {
  2131.                   LOG.warn("runScript failed with: " + StringUtils.
  2132.                                                       stringifyException(ioe));
  2133.                 }
  2134.               } catch(IOException e) {
  2135.                 LOG.warn("Error in preparing wrapped debug command");
  2136.               }
  2137.               // add all lines of debug out to diagnostics
  2138.               try {
  2139.                 int num = localJobConf.getInt("mapred.debug.out.lines", -1);
  2140.                 addDiagnostics(FileUtil.makeShellPath(stdout),num,"DEBUG OUT");
  2141.               } catch(IOException ioe) {
  2142.                 LOG.warn("Exception in add diagnostics!");
  2143.               }
  2144.             }
  2145.           }
  2146.           taskStatus.setProgress(0.0f);
  2147.         }
  2148.         this.taskStatus.setFinishTime(System.currentTimeMillis());
  2149.         needCleanup = (taskStatus.getRunState() == TaskStatus.State.FAILED || 
  2150.                 taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN ||
  2151.                 taskStatus.getRunState() == TaskStatus.State.KILLED_UNCLEAN || 
  2152.                 taskStatus.getRunState() == TaskStatus.State.KILLED);
  2153.       }
  2154.       //
  2155.       // If the task has failed, or if the task was killAndCleanup()'ed,
  2156.       // we should clean up right away.  We only wait to cleanup
  2157.       // if the task succeeded, and its results might be useful
  2158.       // later on to downstream job processing.
  2159.       //
  2160.       if (needCleanup) {
  2161.         removeTaskFromJob(task.getJobID(), this);
  2162.       }
  2163.       try {
  2164.         cleanup(needCleanup);
  2165.       } catch (IOException ie) {
  2166.       }
  2167.     }
  2168.     
  2169.     /**
  2170.      * Runs the script given in args
  2171.      * @param args script name followed by its argumnets
  2172.      * @param dir current working directory.
  2173.      * @throws IOException
  2174.      */
  2175.     public void runScript(List<String> args, File dir) throws IOException {
  2176.       ShellCommandExecutor shexec = 
  2177.               new ShellCommandExecutor(args.toArray(new String[0]), dir);
  2178.       shexec.execute();
  2179.       int exitCode = shexec.getExitCode();
  2180.       if (exitCode != 0) {
  2181.         throw new IOException("Task debug script exit with nonzero status of " 
  2182.                               + exitCode + ".");
  2183.       }
  2184.     }
  2185.     /**
  2186.      * Add last 'num' lines of the given file to the diagnostics.
  2187.      * if num =-1, all the lines of file are added to the diagnostics.
  2188.      * @param file The file from which to collect diagnostics.
  2189.      * @param num The number of lines to be sent to diagnostics.
  2190.      * @param tag The tag is printed before the diagnostics are printed. 
  2191.      */
  2192.     public void addDiagnostics(String file, int num, String tag) {
  2193.       RandomAccessFile rafile = null;
  2194.       try {
  2195.         rafile = new RandomAccessFile(file,"r");
  2196.         int no_lines =0;
  2197.         String line = null;
  2198.         StringBuffer tail = new StringBuffer();
  2199.         tail.append("n-------------------- "+tag+"---------------------n");
  2200.         String[] lines = null;
  2201.         if (num >0) {
  2202.           lines = new String[num];
  2203.         }
  2204.         while ((line = rafile.readLine()) != null) {
  2205.           no_lines++;
  2206.           if (num >0) {
  2207.             if (no_lines <= num) {
  2208.               lines[no_lines-1] = line;
  2209.             }
  2210.             else { // shift them up
  2211.               for (int i=0; i<num-1; ++i) {
  2212.                 lines[i] = lines[i+1];
  2213.               }
  2214.               lines[num-1] = line;
  2215.             }
  2216.           }
  2217.           else if (num == -1) {
  2218.             tail.append(line); 
  2219.             tail.append("n");
  2220.           }
  2221.         }
  2222.         int n = no_lines > num ?num:no_lines;
  2223.         if (num >0) {
  2224.           for (int i=0;i<n;i++) {
  2225.             tail.append(lines[i]);
  2226.             tail.append("n");
  2227.           }
  2228.         }
  2229.         if(n!=0)
  2230.           reportDiagnosticInfo(tail.toString());
  2231.       } catch (FileNotFoundException fnfe){
  2232.         LOG.warn("File "+file+ " not found");
  2233.       } catch (IOException ioe){
  2234.         LOG.warn("Error reading file "+file);
  2235.       } finally {
  2236.          try {
  2237.            if (rafile != null) {
  2238.              rafile.close();
  2239.            }
  2240.          } catch (IOException ioe) {
  2241.            LOG.warn("Error closing file "+file);
  2242.          }
  2243.       }
  2244.     }
  2245.     
  2246.     /**
  2247.      * We no longer need anything from this task, as the job has
  2248.      * finished.  If the task is still running, kill it and clean up.
  2249.      * 
  2250.      * @param wasFailure did the task fail, as opposed to was it killed by
  2251.      *                   the framework
  2252.      */
  2253.     public void jobHasFinished(boolean wasFailure) throws IOException {
  2254.       // Kill the task if it is still running
  2255.       synchronized(this){
  2256.         if (getRunState() == TaskStatus.State.RUNNING ||
  2257.             getRunState() == TaskStatus.State.UNASSIGNED ||
  2258.             getRunState() == TaskStatus.State.COMMIT_PENDING ||
  2259.             isCleaningup()) {
  2260.           kill(wasFailure);
  2261.         }
  2262.       }
  2263.       
  2264.       // Cleanup on the finished task
  2265.       cleanup(true);
  2266.     }
  2267.     /**
  2268.      * Something went wrong and the task must be killed.
  2269.      * @param wasFailure was it a failure (versus a kill request)?
  2270.      */
  2271.     public synchronized void kill(boolean wasFailure) throws IOException {
  2272.       if (taskStatus.getRunState() == TaskStatus.State.RUNNING ||
  2273.           taskStatus.getRunState() == TaskStatus.State.COMMIT_PENDING ||
  2274.           isCleaningup()) {
  2275.         wasKilled = true;
  2276.         if (wasFailure) {
  2277.           failures += 1;
  2278.         }
  2279.         // runner could be null if task-cleanup attempt is not localized yet
  2280.         if (runner != null) {
  2281.           runner.kill();
  2282.         }
  2283.         setTaskFailState(wasFailure);
  2284.       } else if (taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {
  2285.         if (wasFailure) {
  2286.           failures += 1;
  2287.           taskStatus.setRunState(TaskStatus.State.FAILED);
  2288.         } else {
  2289.           taskStatus.setRunState(TaskStatus.State.KILLED);
  2290.         }
  2291.       }
  2292.       removeFromMemoryManager(task.getTaskID());
  2293.       releaseSlot();
  2294.     }
  2295.     
  2296.     private synchronized void releaseSlot() {
  2297.       if (slotTaken) {
  2298.         if (launcher != null) {
  2299.           launcher.addFreeSlot();
  2300.         }
  2301.         slotTaken = false;
  2302.       }
  2303.     }
  2304.     /**
  2305.      * The map output has been lost.
  2306.      */
  2307.     private synchronized void mapOutputLost(String failure
  2308.                                            ) throws IOException {
  2309.       if (taskStatus.getRunState() == TaskStatus.State.COMMIT_PENDING || 
  2310.           taskStatus.getRunState() == TaskStatus.State.SUCCEEDED) {
  2311.         // change status to failure
  2312.         LOG.info("Reporting output lost:"+task.getTaskID());
  2313.         taskStatus.setRunState(TaskStatus.State.FAILED);
  2314.         taskStatus.setProgress(0.0f);
  2315.         reportDiagnosticInfo("Map output lost, rescheduling: " + 
  2316.                              failure);
  2317.         runningTasks.put(task.getTaskID(), this);
  2318.         mapTotal++;
  2319.       } else {
  2320.         LOG.warn("Output already reported lost:"+task.getTaskID());
  2321.       }
  2322.     }
  2323.     /**
  2324.      * We no longer need anything from this task.  Either the 
  2325.      * controlling job is all done and the files have been copied
  2326.      * away, or the task failed and we don't need the remains.
  2327.      * Any calls to cleanup should not lock the tip first.
  2328.      * cleanup does the right thing- updates tasks in Tasktracker
  2329.      * by locking tasktracker first and then locks the tip.
  2330.      * 
  2331.      * if needCleanup is true, the whole task directory is cleaned up.
  2332.      * otherwise the current working directory of the task 
  2333.      * i.e. &lt;taskid&gt;/work is cleaned up.
  2334.      */
  2335.     void cleanup(boolean needCleanup) throws IOException {
  2336.       TaskAttemptID taskId = task.getTaskID();
  2337.       LOG.debug("Cleaning up " + taskId);
  2338.       synchronized (TaskTracker.this) {
  2339.         if (needCleanup) {
  2340.           // see if tasks data structure is holding this tip.
  2341.           // tasks could hold the tip for cleanup attempt, if cleanup attempt 
  2342.           // got launched before this method.
  2343.           if (tasks.get(taskId) == this) {
  2344.             tasks.remove(taskId);
  2345.           }
  2346.         }
  2347.         synchronized (this){
  2348.           if (alwaysKeepTaskFiles ||
  2349.               (taskStatus.getRunState() == TaskStatus.State.FAILED && 
  2350.                keepFailedTaskFiles)) {
  2351.             return;
  2352.           }
  2353.         }
  2354.       }
  2355.       synchronized (this) {
  2356.         try {
  2357.           // localJobConf could be null if localization has not happened
  2358.           // then no cleanup will be required.
  2359.           if (localJobConf == null) {
  2360.             return;
  2361.           }
  2362.           String taskDir = getLocalTaskDir(task.getJobID().toString(),
  2363.                              taskId.toString(), task.isTaskCleanupTask());
  2364.           if (needCleanup) {
  2365.             if (runner != null) {
  2366.               //cleans up the output directory of the task (where map outputs 
  2367.               //and reduce inputs get stored)
  2368.               runner.close();
  2369.             }
  2370.             //We don't delete the workdir
  2371.             //since some other task (running in the same JVM) 
  2372.             //might be using the dir. The JVM running the tasks would clean
  2373.             //the workdir per a task in the task process itself.
  2374.             if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
  2375.               directoryCleanupThread.addToQueue(defaultJobConf,
  2376.                   getLocalFiles(defaultJobConf,
  2377.                   taskDir));
  2378.             }  
  2379.             
  2380.             else {
  2381.               directoryCleanupThread.addToQueue(defaultJobConf,
  2382.                   getLocalFiles(defaultJobConf,
  2383.                 taskDir+"/job.xml"));
  2384.             }
  2385.           } else {
  2386.             if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
  2387.               directoryCleanupThread.addToQueue(defaultJobConf,
  2388.                   getLocalFiles(defaultJobConf,
  2389.                   taskDir+"/work"));
  2390.             }  
  2391.           }
  2392.         } catch (Throwable ie) {
  2393.           LOG.info("Error cleaning up task runner: " + 
  2394.                    StringUtils.stringifyException(ie));
  2395.         }
  2396.       }
  2397.     }
  2398.         
  2399.     @Override
  2400.     public boolean equals(Object obj) {
  2401.       return (obj instanceof TaskInProgress) &&
  2402.         task.getTaskID().equals
  2403.         (((TaskInProgress) obj).getTask().getTaskID());
  2404.     }
  2405.         
  2406.     @Override
  2407.     public int hashCode() {
  2408.       return task.getTaskID().hashCode();
  2409.     }
  2410.   }
  2411.     
  2412.   // ///////////////////////////////////////////////////////////////
  2413.   // TaskUmbilicalProtocol
  2414.   /////////////////////////////////////////////////////////////////
  2415.   /**
  2416.    * Called upon startup by the child process, to fetch Task data.
  2417.    */
  2418.   public synchronized JvmTask getTask(JVMId jvmId) 
  2419.   throws IOException {
  2420.     LOG.debug("JVM with ID : " + jvmId + " asked for a task");
  2421.     if (!jvmManager.isJvmKnown(jvmId)) {
  2422.       LOG.info("Killing unknown JVM " + jvmId);
  2423.       return new JvmTask(null, true);
  2424.     }
  2425.     RunningJob rjob = runningJobs.get(jvmId.getJobId());
  2426.     if (rjob == null) { //kill the JVM since the job is dead
  2427.       LOG.info("Killing JVM " + jvmId + " since job " + jvmId.getJobId() +
  2428.                " is dead");
  2429.       jvmManager.killJvm(jvmId);
  2430.       return new JvmTask(null, true);
  2431.     }
  2432.     TaskInProgress tip = jvmManager.getTaskForJvm(jvmId);
  2433.     if (tip == null) {
  2434.       return new JvmTask(null, false);
  2435.     }
  2436.     if (tasks.get(tip.getTask().getTaskID()) != null) { //is task still present
  2437.       LOG.info("JVM with ID: " + jvmId + " given task: " + 
  2438.           tip.getTask().getTaskID());
  2439.       return new JvmTask(tip.getTask(), false);
  2440.     } else {
  2441.       LOG.info("Killing JVM with ID: " + jvmId + " since scheduled task: " + 
  2442.           tip.getTask().getTaskID() + " is " + tip.taskStatus.getRunState());
  2443.       return new JvmTask(null, true);
  2444.     }
  2445.   }
  2446.   /**
  2447.    * Called periodically to report Task progress, from 0.0 to 1.0.
  2448.    */
  2449.   public synchronized boolean statusUpdate(TaskAttemptID taskid, 
  2450.                                               TaskStatus taskStatus) 
  2451.   throws IOException {
  2452.     TaskInProgress tip = tasks.get(taskid);
  2453.     if (tip != null) {
  2454.       tip.reportProgress(taskStatus);
  2455.       return true;
  2456.     } else {
  2457.       LOG.warn("Progress from unknown child task: "+taskid);
  2458.       return false;
  2459.     }
  2460.   }
  2461.   /**
  2462.    * Called when the task dies before completion, and we want to report back
  2463.    * diagnostic info
  2464.    */
  2465.   public synchronized void reportDiagnosticInfo(TaskAttemptID taskid, String info) throws IOException {
  2466.     TaskInProgress tip = tasks.get(taskid);
  2467.     if (tip != null) {
  2468.       tip.reportDiagnosticInfo(info);
  2469.     } else {
  2470.       LOG.warn("Error from unknown child task: "+taskid+". Ignored.");
  2471.     }
  2472.   }
  2473.   
  2474.   public synchronized void reportNextRecordRange(TaskAttemptID taskid, 
  2475.       SortedRanges.Range range) throws IOException {
  2476.     TaskInProgress tip = tasks.get(taskid);
  2477.     if (tip != null) {
  2478.       tip.reportNextRecordRange(range);
  2479.     } else {
  2480.       LOG.warn("reportNextRecordRange from unknown child task: "+taskid+". " +
  2481.        "Ignored.");
  2482.     }
  2483.   }
  2484.   /** Child checking to see if we're alive.  Normally does nothing.*/
  2485.   public synchronized boolean ping(TaskAttemptID taskid) throws IOException {
  2486.     return tasks.get(taskid) != null;
  2487.   }
  2488.   /**
  2489.    * Task is reporting that it is in commit_pending
  2490.    * and it is waiting for the commit Response
  2491.    */
  2492.   public synchronized void commitPending(TaskAttemptID taskid,
  2493.                                          TaskStatus taskStatus) 
  2494.   throws IOException {
  2495.     LOG.info("Task " + taskid + " is in commit-pending," +"" +
  2496.              " task state:" +taskStatus.getRunState());
  2497.     statusUpdate(taskid, taskStatus);
  2498.     reportTaskFinished(taskid, true);
  2499.   }
  2500.   
  2501.   /**
  2502.    * Child checking whether it can commit 
  2503.    */
  2504.   public synchronized boolean canCommit(TaskAttemptID taskid) {
  2505.     return commitResponses.contains(taskid); //don't remove it now
  2506.   }
  2507.   
  2508.   /**
  2509.    * The task is done.
  2510.    */
  2511.   public synchronized void done(TaskAttemptID taskid) 
  2512.   throws IOException {
  2513.     TaskInProgress tip = tasks.get(taskid);
  2514.     commitResponses.remove(taskid);
  2515.     if (tip != null) {
  2516.       tip.reportDone();
  2517.     } else {
  2518.       LOG.warn("Unknown child task done: "+taskid+". Ignored.");
  2519.     }
  2520.   }
  2521.   /** 
  2522.    * A reduce-task failed to shuffle the map-outputs. Kill the task.
  2523.    */  
  2524.   public synchronized void shuffleError(TaskAttemptID taskId, String message) 
  2525.   throws IOException { 
  2526.     LOG.fatal("Task: " + taskId + " - Killed due to Shuffle Failure: " + message);
  2527.     TaskInProgress tip = runningTasks.get(taskId);
  2528.     tip.reportDiagnosticInfo("Shuffle Error: " + message);
  2529.     purgeTask(tip, true);
  2530.   }
  2531.   /** 
  2532.    * A child task had a local filesystem error. Kill the task.
  2533.    */  
  2534.   public synchronized void fsError(TaskAttemptID taskId, String message) 
  2535.   throws IOException {
  2536.     LOG.fatal("Task: " + taskId + " - Killed due to FSError: " + message);
  2537.     TaskInProgress tip = runningTasks.get(taskId);
  2538.     tip.reportDiagnosticInfo("FSError: " + message);
  2539.     purgeTask(tip, true);
  2540.   }
  2541.   public synchronized MapTaskCompletionEventsUpdate getMapCompletionEvents(
  2542.       JobID jobId, int fromEventId, int maxLocs, TaskAttemptID id) 
  2543.   throws IOException {
  2544.     TaskCompletionEvent[]mapEvents = TaskCompletionEvent.EMPTY_ARRAY;
  2545.     synchronized (shouldReset) {
  2546.       if (shouldReset.remove(id)) {
  2547.         return new MapTaskCompletionEventsUpdate(mapEvents, true);
  2548.       }
  2549.     }
  2550.     RunningJob rjob;
  2551.     synchronized (runningJobs) {
  2552.       rjob = runningJobs.get(jobId);          
  2553.       if (rjob != null) {
  2554.         synchronized (rjob) {
  2555.           FetchStatus f = rjob.getFetchStatus();
  2556.           if (f != null) {
  2557.             mapEvents = f.getMapEvents(fromEventId, maxLocs);
  2558.           }
  2559.         }
  2560.       }
  2561.     }
  2562.     return new MapTaskCompletionEventsUpdate(mapEvents, false);
  2563.   }
  2564.     
  2565.   /////////////////////////////////////////////////////
  2566.   //  Called by TaskTracker thread after task process ends
  2567.   /////////////////////////////////////////////////////
  2568.   /**
  2569.    * The task is no longer running.  It may not have completed successfully
  2570.    */
  2571.   void reportTaskFinished(TaskAttemptID taskid, boolean commitPending) {
  2572.     TaskInProgress tip;
  2573.     synchronized (this) {
  2574.       tip = tasks.get(taskid);
  2575.     }
  2576.     if (tip != null) {
  2577.       if (!commitPending) {
  2578.         tip.reportTaskFinished();
  2579.       }
  2580.     } else {
  2581.       LOG.warn("Unknown child task finished: "+taskid+". Ignored.");
  2582.     }
  2583.   }
  2584.   
  2585.   /**
  2586.    * A completed map task's output has been lost.
  2587.    */
  2588.   public synchronized void mapOutputLost(TaskAttemptID taskid,
  2589.                                          String errorMsg) throws IOException {
  2590.     TaskInProgress tip = tasks.get(taskid);
  2591.     if (tip != null) {
  2592.       tip.mapOutputLost(errorMsg);
  2593.     } else {
  2594.       LOG.warn("Unknown child with bad map output: "+taskid+". Ignored.");
  2595.     }
  2596.   }
  2597.     
  2598.   /**
  2599.    *  The datastructure for initializing a job
  2600.    */
  2601.   static class RunningJob{
  2602.     private JobID jobid; 
  2603.     private JobConf jobConf;
  2604.     // keep this for later use
  2605.     volatile Set<TaskInProgress> tasks;
  2606.     boolean localized;
  2607.     boolean keepJobFiles;
  2608.     FetchStatus f;
  2609.     RunningJob(JobID jobid) {
  2610.       this.jobid = jobid;
  2611.       localized = false;
  2612.       tasks = new HashSet<TaskInProgress>();
  2613.       keepJobFiles = false;
  2614.     }
  2615.       
  2616.     JobID getJobID() {
  2617.       return jobid;
  2618.     }
  2619.       
  2620.     void setFetchStatus(FetchStatus f) {
  2621.       this.f = f;
  2622.     }
  2623.       
  2624.     FetchStatus getFetchStatus() {
  2625.       return f;
  2626.     }
  2627.   }
  2628.   /**
  2629.    * Get the name for this task tracker.
  2630.    * @return the string like "tracker_mymachine:50010"
  2631.    */
  2632.   String getName() {
  2633.     return taskTrackerName;
  2634.   }
  2635.     
  2636.   private synchronized List<TaskStatus> cloneAndResetRunningTaskStatuses(
  2637.                                           boolean sendCounters) {
  2638.     List<TaskStatus> result = new ArrayList<TaskStatus>(runningTasks.size());
  2639.     for(TaskInProgress tip: runningTasks.values()) {
  2640.       TaskStatus status = tip.getStatus();
  2641.       status.setIncludeCounters(sendCounters);
  2642.       status.setOutputSize(tryToGetOutputSize(status.getTaskID(), fConf));
  2643.       // send counters for finished or failed tasks and commit pending tasks
  2644.       if (status.getRunState() != TaskStatus.State.RUNNING) {
  2645.         status.setIncludeCounters(true);
  2646.       }
  2647.       result.add((TaskStatus)status.clone());
  2648.       status.clearStatus();
  2649.     }
  2650.     return result;
  2651.   }
  2652.   /**
  2653.    * Get the list of tasks that will be reported back to the 
  2654.    * job tracker in the next heartbeat cycle.
  2655.    * @return a copy of the list of TaskStatus objects
  2656.    */
  2657.   synchronized List<TaskStatus> getRunningTaskStatuses() {
  2658.     List<TaskStatus> result = new ArrayList<TaskStatus>(runningTasks.size());
  2659.     for(TaskInProgress tip: runningTasks.values()) {
  2660.       result.add(tip.getStatus());
  2661.     }
  2662.     return result;
  2663.   }
  2664.   /**
  2665.    * Get the list of stored tasks on this task tracker.
  2666.    * @return
  2667.    */
  2668.   synchronized List<TaskStatus> getNonRunningTasks() {
  2669.     List<TaskStatus> result = new ArrayList<TaskStatus>(tasks.size());
  2670.     for(Map.Entry<TaskAttemptID, TaskInProgress> task: tasks.entrySet()) {
  2671.       if (!runningTasks.containsKey(task.getKey())) {
  2672.         result.add(task.getValue().getStatus());
  2673.       }
  2674.     }
  2675.     return result;
  2676.   }
  2677.   /**
  2678.    * Get the list of tasks from running jobs on this task tracker.
  2679.    * @return a copy of the list of TaskStatus objects
  2680.    */
  2681.   synchronized List<TaskStatus> getTasksFromRunningJobs() {
  2682.     List<TaskStatus> result = new ArrayList<TaskStatus>(tasks.size());
  2683.     for (Map.Entry <JobID, RunningJob> item : runningJobs.entrySet()) {
  2684.       RunningJob rjob = item.getValue();
  2685.       synchronized (rjob) {
  2686.         for (TaskInProgress tip : rjob.tasks) {
  2687.           result.add(tip.getStatus());
  2688.         }
  2689.       }
  2690.     }
  2691.     return result;
  2692.   }
  2693.   
  2694.   /**
  2695.    * Get the default job conf for this tracker.
  2696.    */
  2697.   JobConf getJobConf() {
  2698.     return fConf;
  2699.   }
  2700.     
  2701.   /**
  2702.    * Check if the given local directories
  2703.    * (and parent directories, if necessary) can be created.
  2704.    * @param localDirs where the new TaskTracker should keep its local files.
  2705.    * @throws DiskErrorException if all local directories are not writable
  2706.    */
  2707.   private static void checkLocalDirs(String[] localDirs) 
  2708.     throws DiskErrorException {
  2709.     boolean writable = false;
  2710.         
  2711.     if (localDirs != null) {
  2712.       for (int i = 0; i < localDirs.length; i++) {
  2713.         try {
  2714.           DiskChecker.checkDir(new File(localDirs[i]));
  2715.           writable = true;
  2716.         } catch(DiskErrorException e) {
  2717.           LOG.warn("Task Tracker local " + e.getMessage());
  2718.         }
  2719.       }
  2720.     }
  2721.     if (!writable)
  2722.       throw new DiskErrorException(
  2723.                                    "all local directories are not writable");
  2724.   }
  2725.     
  2726.   /**
  2727.    * Is this task tracker idle?
  2728.    * @return has this task tracker finished and cleaned up all of its tasks?
  2729.    */
  2730.   public synchronized boolean isIdle() {
  2731.     return tasks.isEmpty() && tasksToCleanup.isEmpty();
  2732.   }
  2733.     
  2734.   /**
  2735.    * Start the TaskTracker, point toward the indicated JobTracker
  2736.    */
  2737.   public static void main(String argv[]) throws Exception {
  2738.     StringUtils.startupShutdownMessage(TaskTracker.class, argv, LOG);
  2739.     if (argv.length != 0) {
  2740.       System.out.println("usage: TaskTracker");
  2741.       System.exit(-1);
  2742.     }
  2743.     try {
  2744.       JobConf conf=new JobConf();
  2745.       // enable the server to track time spent waiting on locks
  2746.       ReflectionUtils.setContentionTracing
  2747.         (conf.getBoolean("tasktracker.contention.tracking", false));
  2748.       new TaskTracker(conf).run();
  2749.     } catch (Throwable e) {
  2750.       LOG.error("Can not start task tracker because "+
  2751.                 StringUtils.stringifyException(e));
  2752.       System.exit(-1);
  2753.     }
  2754.   }
  2755.   /**
  2756.    * This class is used in TaskTracker's Jetty to serve the map outputs
  2757.    * to other nodes.
  2758.    */
  2759.   public static class MapOutputServlet extends HttpServlet {
  2760.     private static final int MAX_BYTES_TO_READ = 64 * 1024;
  2761.     @Override
  2762.     public void doGet(HttpServletRequest request, 
  2763.                       HttpServletResponse response
  2764.                       ) throws ServletException, IOException {
  2765.       String mapId = request.getParameter("map");
  2766.       String reduceId = request.getParameter("reduce");
  2767.       String jobId = request.getParameter("job");
  2768.       if (jobId == null) {
  2769.         throw new IOException("job parameter is required");
  2770.       }
  2771.       if (mapId == null || reduceId == null) {
  2772.         throw new IOException("map and reduce parameters are required");
  2773.       }
  2774.       ServletContext context = getServletContext();
  2775.       int reduce = Integer.parseInt(reduceId);
  2776.       byte[] buffer = new byte[MAX_BYTES_TO_READ];
  2777.       // true iff IOException was caused by attempt to access input
  2778.       boolean isInputException = true;
  2779.       OutputStream outStream = null;
  2780.       FSDataInputStream mapOutputIn = null;
  2781.  
  2782.       long totalRead = 0;
  2783.       ShuffleServerMetrics shuffleMetrics =
  2784.         (ShuffleServerMetrics) context.getAttribute("shuffleServerMetrics");
  2785.       TaskTracker tracker = 
  2786.         (TaskTracker) context.getAttribute("task.tracker");
  2787.       try {
  2788.         shuffleMetrics.serverHandlerBusy();
  2789.         outStream = response.getOutputStream();
  2790.         JobConf conf = (JobConf) context.getAttribute("conf");
  2791.         LocalDirAllocator lDirAlloc = 
  2792.           (LocalDirAllocator)context.getAttribute("localDirAllocator");
  2793.         FileSystem rfs = ((LocalFileSystem)
  2794.             context.getAttribute("local.file.system")).getRaw();
  2795.         // Index file
  2796.         Path indexFileName = lDirAlloc.getLocalPathToRead(
  2797.             TaskTracker.getIntermediateOutputDir(jobId, mapId)
  2798.             + "/file.out.index", conf);
  2799.         
  2800.         // Map-output file
  2801.         Path mapOutputFileName = lDirAlloc.getLocalPathToRead(
  2802.             TaskTracker.getIntermediateOutputDir(jobId, mapId)
  2803.             + "/file.out", conf);
  2804.         /**
  2805.          * Read the index file to get the information about where
  2806.          * the map-output for the given reducer is available. 
  2807.          */
  2808.        IndexRecord info = 
  2809.           tracker.indexCache.getIndexInformation(mapId, reduce,indexFileName);
  2810.           
  2811.         //set the custom "Raw-Map-Output-Length" http header to 
  2812.         //the raw (decompressed) length
  2813.         response.setHeader(RAW_MAP_OUTPUT_LENGTH,
  2814.             Long.toString(info.rawLength));
  2815.         //set the custom "Map-Output-Length" http header to 
  2816.         //the actual number of bytes being transferred
  2817.         response.setHeader(MAP_OUTPUT_LENGTH,
  2818.             Long.toString(info.partLength));
  2819.         //use the same buffersize as used for reading the data from disk
  2820.         response.setBufferSize(MAX_BYTES_TO_READ);
  2821.         
  2822.         /**
  2823.          * Read the data from the sigle map-output file and
  2824.          * send it to the reducer.
  2825.          */
  2826.         //open the map-output file
  2827.         mapOutputIn = rfs.open(mapOutputFileName);
  2828.         //seek to the correct offset for the reduce
  2829.         mapOutputIn.seek(info.startOffset);
  2830.         long rem = info.partLength;
  2831.         int len =
  2832.           mapOutputIn.read(buffer, 0, (int)Math.min(rem, MAX_BYTES_TO_READ));
  2833.         while (rem > 0 && len >= 0) {
  2834.           rem -= len;
  2835.           try {
  2836.             shuffleMetrics.outputBytes(len);
  2837.             outStream.write(buffer, 0, len);
  2838.             outStream.flush();
  2839.           } catch (IOException ie) {
  2840.             isInputException = false;
  2841.             throw ie;
  2842.           }
  2843.           totalRead += len;
  2844.           len =
  2845.             mapOutputIn.read(buffer, 0, (int)Math.min(rem, MAX_BYTES_TO_READ));
  2846.         }
  2847.         LOG.info("Sent out " + totalRead + " bytes for reduce: " + reduce + 
  2848.                  " from map: " + mapId + " given " + info.partLength + "/" + 
  2849.                  info.rawLength);
  2850.       } catch (IOException ie) {
  2851.         Log log = (Log) context.getAttribute("log");
  2852.         String errorMsg = ("getMapOutput(" + mapId + "," + reduceId + 
  2853.                            ") failed :n"+
  2854.                            StringUtils.stringifyException(ie));
  2855.         log.warn(errorMsg);
  2856.         if (isInputException) {
  2857.           tracker.mapOutputLost(TaskAttemptID.forName(mapId), errorMsg);
  2858.         }
  2859.         response.sendError(HttpServletResponse.SC_GONE, errorMsg);
  2860.         shuffleMetrics.failedOutput();
  2861.         throw ie;
  2862.       } finally {
  2863.         if (null != mapOutputIn) {
  2864.           mapOutputIn.close();
  2865.         }
  2866.         shuffleMetrics.serverHandlerFree();
  2867.         if (ClientTraceLog.isInfoEnabled()) {
  2868.           ClientTraceLog.info(String.format(MR_CLIENTTRACE_FORMAT,
  2869.                 request.getLocalAddr() + ":" + request.getLocalPort(),
  2870.                 request.getRemoteAddr() + ":" + request.getRemotePort(),
  2871.                 totalRead, "MAPRED_SHUFFLE", mapId));
  2872.         }
  2873.       }
  2874.       outStream.close();
  2875.       shuffleMetrics.successOutput();
  2876.     }
  2877.   }
  2878.   // get the full paths of the directory in all the local disks.
  2879.   private Path[] getLocalFiles(JobConf conf, String subdir) throws IOException{
  2880.     String[] localDirs = conf.getLocalDirs();
  2881.     Path[] paths = new Path[localDirs.length];
  2882.     FileSystem localFs = FileSystem.getLocal(conf);
  2883.     for (int i = 0; i < localDirs.length; i++) {
  2884.       paths[i] = new Path(localDirs[i], subdir);
  2885.       paths[i] = paths[i].makeQualified(localFs);
  2886.     }
  2887.     return paths;
  2888.   }
  2889.   int getMaxCurrentMapTasks() {
  2890.     return maxCurrentMapTasks;
  2891.   }
  2892.   
  2893.   int getMaxCurrentReduceTasks() {
  2894.     return maxCurrentReduceTasks;
  2895.   }
  2896.   /**
  2897.    * Is the TaskMemoryManager Enabled on this system?
  2898.    * @return true if enabled, false otherwise.
  2899.    */
  2900.   public boolean isTaskMemoryManagerEnabled() {
  2901.     return taskMemoryManagerEnabled;
  2902.   }
  2903.   
  2904.   public TaskMemoryManagerThread getTaskMemoryManager() {
  2905.     return taskMemoryManager;
  2906.   }
  2907.   /**
  2908.    * Normalize the negative values in configuration
  2909.    * 
  2910.    * @param val
  2911.    * @return normalized val
  2912.    */
  2913.   private long normalizeMemoryConfigValue(long val) {
  2914.     if (val < 0) {
  2915.       val = JobConf.DISABLED_MEMORY_LIMIT;
  2916.     }
  2917.     return val;
  2918.   }
  2919.   /**
  2920.    * Memory-related setup
  2921.    */
  2922.   private void initializeMemoryManagement() {
  2923.     Class<? extends MemoryCalculatorPlugin> clazz =
  2924.         fConf.getClass(MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY,
  2925.             null, MemoryCalculatorPlugin.class);
  2926.     MemoryCalculatorPlugin memoryCalculatorPlugin =
  2927.         (MemoryCalculatorPlugin) MemoryCalculatorPlugin
  2928.             .getMemoryCalculatorPlugin(clazz, fConf);
  2929.     LOG.info(" Using MemoryCalculatorPlugin : " + memoryCalculatorPlugin);
  2930.     if (memoryCalculatorPlugin != null) {
  2931.       totalVirtualMemoryOnTT = memoryCalculatorPlugin.getVirtualMemorySize();
  2932.       if (totalVirtualMemoryOnTT <= 0) {
  2933.         LOG.warn("TaskTracker's totalVmem could not be calculated. "
  2934.             + "Setting it to " + JobConf.DISABLED_MEMORY_LIMIT);
  2935.         totalVirtualMemoryOnTT = JobConf.DISABLED_MEMORY_LIMIT;
  2936.       }
  2937.       totalPmemOnTT = memoryCalculatorPlugin.getPhysicalMemorySize();
  2938.       if (totalPmemOnTT <= 0) {
  2939.         LOG.warn("TaskTracker's totalPmem could not be calculated. "
  2940.             + "Setting it to " + JobConf.DISABLED_MEMORY_LIMIT);
  2941.         totalPmemOnTT = JobConf.DISABLED_MEMORY_LIMIT;
  2942.       }
  2943.     }
  2944.     reservedVirtualMemory =
  2945.         normalizeMemoryConfigValue(fConf.getLong(
  2946.             TaskTracker.MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY,
  2947.             JobConf.DISABLED_MEMORY_LIMIT));
  2948.     reservedPmem =
  2949.         normalizeMemoryConfigValue(fConf.getLong(
  2950.             TaskTracker.MAPRED_TASKTRACKER_PMEM_RESERVED_PROPERTY,
  2951.             JobConf.DISABLED_MEMORY_LIMIT));
  2952.     defaultMaxVmPerTask =
  2953.         normalizeMemoryConfigValue(fConf.getLong(
  2954.             JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
  2955.             JobConf.DISABLED_MEMORY_LIMIT));
  2956.     limitMaxVmPerTask =
  2957.         normalizeMemoryConfigValue(fConf.getLong(
  2958.             JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
  2959.             JobConf.DISABLED_MEMORY_LIMIT));
  2960.     // start the taskMemoryManager thread only if enabled
  2961.     setTaskMemoryManagerEnabledFlag();
  2962.     if (isTaskMemoryManagerEnabled()) {
  2963.       taskMemoryManager = new TaskMemoryManagerThread(this);
  2964.       taskMemoryManager.setDaemon(true);
  2965.       taskMemoryManager.start();
  2966.     }
  2967.   }
  2968.   private void setTaskMemoryManagerEnabledFlag() {
  2969.     if (!ProcfsBasedProcessTree.isAvailable()) {
  2970.       LOG.info("ProcessTree implementation is missing on this system. "
  2971.           + "TaskMemoryManager is disabled.");
  2972.       taskMemoryManagerEnabled = false;
  2973.       return;
  2974.     }
  2975.     // /// Missing configuration
  2976.     StringBuilder mesg = new StringBuilder();
  2977.     long totalVmemOnTT = getTotalVirtualMemoryOnTT();
  2978.     if (totalVmemOnTT == JobConf.DISABLED_MEMORY_LIMIT) {
  2979.       mesg.append("TaskTracker's totalVmem could not be calculated.n");
  2980.       taskMemoryManagerEnabled = false;
  2981.     }
  2982.     long reservedVmem = getReservedVirtualMemory();
  2983.     if (reservedVmem == JobConf.DISABLED_MEMORY_LIMIT) {
  2984.       mesg.append("TaskTracker's reservedVmem is not configured.n");
  2985.       taskMemoryManagerEnabled = false;
  2986.     }
  2987.     if (defaultMaxVmPerTask == JobConf.DISABLED_MEMORY_LIMIT) {
  2988.       mesg.append("TaskTracker's defaultMaxVmPerTask is not configured.n");
  2989.       taskMemoryManagerEnabled = false;
  2990.     }
  2991.     if (limitMaxVmPerTask == JobConf.DISABLED_MEMORY_LIMIT) {
  2992.       mesg.append("TaskTracker's limitMaxVmPerTask is not configured.n");
  2993.       taskMemoryManagerEnabled = false;
  2994.     }
  2995.     if (!taskMemoryManagerEnabled) {
  2996.       LOG.warn(mesg.toString() + "TaskMemoryManager is disabled.");
  2997.       return;
  2998.     }
  2999.     // ///// End of missing configuration
  3000.     // ///// Mis-configuration
  3001.     if (defaultMaxVmPerTask > limitMaxVmPerTask) {
  3002.       mesg.append("defaultMaxVmPerTask is mis-configured. "
  3003.           + "It shouldn't be greater than limitMaxVmPerTask. ");
  3004.       taskMemoryManagerEnabled = false;
  3005.     }
  3006.     if (reservedVmem > totalVmemOnTT) {
  3007.       mesg.append("reservedVmemOnTT is mis-configured. "
  3008.           + "It shouldn't be greater than totalVmemOnTT");
  3009.       taskMemoryManagerEnabled = false;
  3010.     }
  3011.     if (!taskMemoryManagerEnabled) {
  3012.       LOG.warn(mesg.toString() + "TaskMemoryManager is disabled.");
  3013.       return;
  3014.     }
  3015.     // ///// End of mis-configuration
  3016.     taskMemoryManagerEnabled = true;
  3017.   }
  3018.   /**
  3019.    * Clean-up the task that TaskMemoryMangerThread requests to do so.
  3020.    * @param tid
  3021.    * @param wasFailure mark the task as failed or killed. 'failed' if true,
  3022.    *          'killed' otherwise
  3023.    * @param diagnosticMsg
  3024.    */
  3025.   synchronized void cleanUpOverMemoryTask(TaskAttemptID tid, boolean wasFailure,
  3026.       String diagnosticMsg) {
  3027.     TaskInProgress tip = runningTasks.get(tid);
  3028.     if (tip != null) {
  3029.       tip.reportDiagnosticInfo(diagnosticMsg);
  3030.       try {
  3031.         purgeTask(tip, wasFailure); // Marking it as failed/killed.
  3032.       } catch (IOException ioe) {
  3033.         LOG.warn("Couldn't purge the task of " + tid + ". Error : " + ioe);
  3034.       }
  3035.     }
  3036.   }
  3037. }