TaskRunner.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:20k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import org.apache.commons.logging.*;
  20. import org.apache.hadoop.fs.*;
  21. import org.apache.hadoop.fs.FileSystem;
  22. import org.apache.hadoop.filecache.*;
  23. import org.apache.hadoop.util.*;
  24. import java.io.*;
  25. import java.net.InetSocketAddress;
  26. import java.util.ArrayList;
  27. import java.util.HashMap;
  28. import java.util.List;
  29. import java.util.Map;
  30. import java.util.Vector;
  31. import java.net.URI;
  32. /** Base class that runs a task in a separate process.  Tasks are run in a
  33.  * separate process in order to isolate the map/reduce system code from bugs in
  34.  * user supplied map and reduce functions.
  35.  */
  36. abstract class TaskRunner extends Thread {
  37.   public static final Log LOG =
  38.     LogFactory.getLog(TaskRunner.class);
  39.   volatile boolean killed = false;
  40.   private TaskTracker.TaskInProgress tip;
  41.   private Task t;
  42.   private Object lock = new Object();
  43.   private volatile boolean done = false;
  44.   private int exitCode = -1;
  45.   private boolean exitCodeSet = false;
  46.   
  47.   private TaskTracker tracker;
  48.   protected JobConf conf;
  49.   JvmManager jvmManager;
  50.   /** 
  51.    * for cleaning up old map outputs
  52.    */
  53.   protected MapOutputFile mapOutputFile;
  54.   public TaskRunner(TaskTracker.TaskInProgress tip, TaskTracker tracker, 
  55.       JobConf conf) {
  56.     this.tip = tip;
  57.     this.t = tip.getTask();
  58.     this.tracker = tracker;
  59.     this.conf = conf;
  60.     this.mapOutputFile = new MapOutputFile(t.getJobID());
  61.     this.mapOutputFile.setConf(conf);
  62.     this.jvmManager = tracker.getJvmManagerInstance();
  63.   }
  64.   public Task getTask() { return t; }
  65.   public TaskTracker.TaskInProgress getTaskInProgress() { return tip; }
  66.   public TaskTracker getTracker() { return tracker; }
  67.   /** Called to assemble this task's input.  This method is run in the parent
  68.    * process before the child is spawned.  It should not execute user code,
  69.    * only system code. */
  70.   public boolean prepare() throws IOException {
  71.     return true;
  72.   }
  73.   /** Called when this task's output is no longer needed.
  74.    * This method is run in the parent process after the child exits.  It should
  75.    * not execute user code, only system code.
  76.    */
  77.   public void close() throws IOException {}
  78.   private static String stringifyPathArray(Path[] p){
  79.     if (p == null){
  80.       return null;
  81.     }
  82.     StringBuffer str = new StringBuffer(p[0].toString());
  83.     for (int i = 1; i < p.length; i++){
  84.       str.append(",");
  85.       str.append(p[i].toString());
  86.     }
  87.     return str.toString();
  88.   }
  89.   
  90.   @Override
  91.   public final void run() {
  92.     try {
  93.       
  94.       //before preparing the job localize 
  95.       //all the archives
  96.       TaskAttemptID taskid = t.getTaskID();
  97.       LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
  98.       File jobCacheDir = null;
  99.       if (conf.getJar() != null) {
  100.         jobCacheDir = new File(
  101.                           new Path(conf.getJar()).getParent().toString());
  102.       }
  103.       File workDir = new File(lDirAlloc.getLocalPathToRead(
  104.                                 TaskTracker.getLocalTaskDir( 
  105.                                   t.getJobID().toString(), 
  106.                                   t.getTaskID().toString(),
  107.                                   t.isTaskCleanupTask())
  108.                                 + Path.SEPARATOR + MRConstants.WORKDIR,
  109.                                 conf). toString());
  110.       URI[] archives = DistributedCache.getCacheArchives(conf);
  111.       URI[] files = DistributedCache.getCacheFiles(conf);
  112.       FileStatus fileStatus;
  113.       FileSystem fileSystem;
  114.       Path localPath;
  115.       String baseDir;
  116.       if ((archives != null) || (files != null)) {
  117.         if (archives != null) {
  118.           String[] archivesTimestamps = 
  119.                                DistributedCache.getArchiveTimestamps(conf);
  120.           Path[] p = new Path[archives.length];
  121.           for (int i = 0; i < archives.length;i++){
  122.             fileSystem = FileSystem.get(archives[i], conf);
  123.             fileStatus = fileSystem.getFileStatus(
  124.                                       new Path(archives[i].getPath()));
  125.             String cacheId = DistributedCache.makeRelative(archives[i],conf);
  126.             String cachePath = TaskTracker.getCacheSubdir() + 
  127.                                  Path.SEPARATOR + cacheId;
  128.             
  129.             localPath = lDirAlloc.getLocalPathForWrite(cachePath,
  130.                                       fileStatus.getLen(), conf);
  131.             baseDir = localPath.toString().replace(cacheId, "");
  132.             p[i] = DistributedCache.getLocalCache(archives[i], conf, 
  133.                                                   new Path(baseDir),
  134.                                                   fileStatus,
  135.                                                   true, Long.parseLong(
  136.                                                         archivesTimestamps[i]),
  137.                                                   new Path(workDir.
  138.                                                         getAbsolutePath()), 
  139.                                                   false);
  140.             
  141.           }
  142.           DistributedCache.setLocalArchives(conf, stringifyPathArray(p));
  143.         }
  144.         if ((files != null)) {
  145.           String[] fileTimestamps = DistributedCache.getFileTimestamps(conf);
  146.           Path[] p = new Path[files.length];
  147.           for (int i = 0; i < files.length;i++){
  148.             fileSystem = FileSystem.get(files[i], conf);
  149.             fileStatus = fileSystem.getFileStatus(
  150.                                       new Path(files[i].getPath()));
  151.             String cacheId = DistributedCache.makeRelative(files[i], conf);
  152.             String cachePath = TaskTracker.getCacheSubdir() +
  153.                                  Path.SEPARATOR + cacheId;
  154.             
  155.             localPath = lDirAlloc.getLocalPathForWrite(cachePath,
  156.                                       fileStatus.getLen(), conf);
  157.             baseDir = localPath.toString().replace(cacheId, "");
  158.             p[i] = DistributedCache.getLocalCache(files[i], conf, 
  159.                                                   new Path(baseDir),
  160.                                                   fileStatus,
  161.                                                   false, Long.parseLong(
  162.                                                            fileTimestamps[i]),
  163.                                                   new Path(workDir.
  164.                                                         getAbsolutePath()), 
  165.                                                   false);
  166.           }
  167.           DistributedCache.setLocalFiles(conf, stringifyPathArray(p));
  168.         }
  169.         Path localTaskFile = new Path(t.getJobFile());
  170.         FileSystem localFs = FileSystem.getLocal(conf);
  171.         localFs.delete(localTaskFile, true);
  172.         OutputStream out = localFs.create(localTaskFile);
  173.         try {
  174.           conf.writeXml(out);
  175.         } finally {
  176.           out.close();
  177.         }
  178.       }
  179.           
  180.       if (!prepare()) {
  181.         return;
  182.       }
  183.       String sep = System.getProperty("path.separator");
  184.       StringBuffer classPath = new StringBuffer();
  185.       // start with same classpath as parent process
  186.       classPath.append(System.getProperty("java.class.path"));
  187.       classPath.append(sep);
  188.       if (!workDir.mkdirs()) {
  189.         if (!workDir.isDirectory()) {
  190.           LOG.fatal("Mkdirs failed to create " + workDir.toString());
  191.         }
  192.       }
  193.   
  194.       String jar = conf.getJar();
  195.       if (jar != null) {       
  196.         // if jar exists, it into workDir
  197.         File[] libs = new File(jobCacheDir, "lib").listFiles();
  198.         if (libs != null) {
  199.           for (int i = 0; i < libs.length; i++) {
  200.             classPath.append(sep);            // add libs from jar to classpath
  201.             classPath.append(libs[i]);
  202.           }
  203.         }
  204.         classPath.append(sep);
  205.         classPath.append(new File(jobCacheDir, "classes"));
  206.         classPath.append(sep);
  207.         classPath.append(jobCacheDir);
  208.        
  209.       }
  210.       // include the user specified classpath
  211.   
  212.       //archive paths
  213.       Path[] archiveClasspaths = DistributedCache.getArchiveClassPaths(conf);
  214.       if (archiveClasspaths != null && archives != null) {
  215.         Path[] localArchives = DistributedCache
  216.           .getLocalCacheArchives(conf);
  217.         if (localArchives != null){
  218.           for (int i=0;i<archives.length;i++){
  219.             for(int j=0;j<archiveClasspaths.length;j++){
  220.               if (archives[i].getPath().equals(
  221.                                                archiveClasspaths[j].toString())){
  222.                 classPath.append(sep);
  223.                 classPath.append(localArchives[i]
  224.                                  .toString());
  225.               }
  226.             }
  227.           }
  228.         }
  229.       }
  230.       //file paths
  231.       Path[] fileClasspaths = DistributedCache.getFileClassPaths(conf);
  232.       if (fileClasspaths!=null && files != null) {
  233.         Path[] localFiles = DistributedCache
  234.           .getLocalCacheFiles(conf);
  235.         if (localFiles != null) {
  236.           for (int i = 0; i < files.length; i++) {
  237.             for (int j = 0; j < fileClasspaths.length; j++) {
  238.               if (files[i].getPath().equals(
  239.                                             fileClasspaths[j].toString())) {
  240.                 classPath.append(sep);
  241.                 classPath.append(localFiles[i].toString());
  242.               }
  243.             }
  244.           }
  245.         }
  246.       }
  247.       classPath.append(sep);
  248.       classPath.append(workDir);
  249.       //  Build exec child jmv args.
  250.       Vector<String> vargs = new Vector<String>(8);
  251.       File jvm =                                  // use same jvm as parent
  252.         new File(new File(System.getProperty("java.home"), "bin"), "java");
  253.       vargs.add(jvm.toString());
  254.       // Add child (task) java-vm options.
  255.       //
  256.       // The following symbols if present in mapred.child.java.opts value are
  257.       // replaced:
  258.       // + @taskid@ is interpolated with value of TaskID.
  259.       // Other occurrences of @ will not be altered.
  260.       //
  261.       // Example with multiple arguments and substitutions, showing
  262.       // jvm GC logging, and start of a passwordless JVM JMX agent so can
  263.       // connect with jconsole and the likes to watch child memory, threads
  264.       // and get thread dumps.
  265.       //
  266.       //  <property>
  267.       //    <name>mapred.child.java.opts</name>
  268.       //    <value>-verbose:gc -Xloggc:/tmp/@taskid@.gc 
  269.       //           -Dcom.sun.management.jmxremote.authenticate=false 
  270.       //           -Dcom.sun.management.jmxremote.ssl=false 
  271.       //    </value>
  272.       //  </property>
  273.       //
  274.       String javaOpts = conf.get("mapred.child.java.opts", "-Xmx200m");
  275.       javaOpts = javaOpts.replace("@taskid@", taskid.toString());
  276.       String [] javaOptsSplit = javaOpts.split(" ");
  277.       
  278.       // Add java.library.path; necessary for loading native libraries.
  279.       //
  280.       // 1. To support native-hadoop library i.e. libhadoop.so, we add the 
  281.       //    parent processes' java.library.path to the child. 
  282.       // 2. We also add the 'cwd' of the task to it's java.library.path to help 
  283.       //    users distribute native libraries via the DistributedCache.
  284.       // 3. The user can also specify extra paths to be added to the 
  285.       //    java.library.path via mapred.child.java.opts.
  286.       //
  287.       String libraryPath = System.getProperty("java.library.path");
  288.       if (libraryPath == null) {
  289.         libraryPath = workDir.getAbsolutePath();
  290.       } else {
  291.         libraryPath += sep + workDir;
  292.       }
  293.       boolean hasUserLDPath = false;
  294.       for(int i=0; i<javaOptsSplit.length ;i++) { 
  295.         if(javaOptsSplit[i].startsWith("-Djava.library.path=")) {
  296.           javaOptsSplit[i] += sep + libraryPath;
  297.           hasUserLDPath = true;
  298.           break;
  299.         }
  300.       }
  301.       if(!hasUserLDPath) {
  302.         vargs.add("-Djava.library.path=" + libraryPath);
  303.       }
  304.       for (int i = 0; i < javaOptsSplit.length; i++) {
  305.         vargs.add(javaOptsSplit[i]);
  306.       }
  307.       // add java.io.tmpdir given by mapred.child.tmp
  308.       String tmp = conf.get("mapred.child.tmp", "./tmp");
  309.       Path tmpDir = new Path(tmp);
  310.       
  311.       // if temp directory path is not absolute 
  312.       // prepend it with workDir.
  313.       if (!tmpDir.isAbsolute()) {
  314.         tmpDir = new Path(workDir.toString(), tmp);
  315.       }
  316.       FileSystem localFs = FileSystem.getLocal(conf);
  317.       if (!localFs.mkdirs(tmpDir) && !localFs.getFileStatus(tmpDir).isDir()) {
  318.         throw new IOException("Mkdirs failed to create " + tmpDir.toString());
  319.       }
  320.       vargs.add("-Djava.io.tmpdir=" + tmpDir.toString());
  321.       // Add classpath.
  322.       vargs.add("-classpath");
  323.       vargs.add(classPath.toString());
  324.       // Setup the log4j prop
  325.       long logSize = TaskLog.getTaskLogLength(conf);
  326.       vargs.add("-Dhadoop.log.dir=" + 
  327.           new File(System.getProperty("hadoop.log.dir")
  328.           ).getAbsolutePath());
  329.       vargs.add("-Dhadoop.root.logger=INFO,TLA");
  330.       vargs.add("-Dhadoop.tasklog.taskid=" + taskid);
  331.       vargs.add("-Dhadoop.tasklog.totalLogFileSize=" + logSize);
  332.       if (conf.getProfileEnabled()) {
  333.         if (conf.getProfileTaskRange(t.isMapTask()
  334.                                      ).isIncluded(t.getPartition())) {
  335.           File prof = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.PROFILE);
  336.           vargs.add(String.format(conf.getProfileParams(), prof.toString()));
  337.         }
  338.       }
  339.       // Add main class and its arguments 
  340.       vargs.add(Child.class.getName());  // main of Child
  341.       // pass umbilical address
  342.       InetSocketAddress address = tracker.getTaskTrackerReportAddress();
  343.       vargs.add(address.getAddress().getHostAddress()); 
  344.       vargs.add(Integer.toString(address.getPort())); 
  345.       vargs.add(taskid.toString());                      // pass task identifier
  346.       String pidFile = lDirAlloc.getLocalPathForWrite(
  347.             (TaskTracker.getPidFile(t.getJobID().toString(), 
  348.              taskid.toString(), t.isTaskCleanupTask())),
  349.             this.conf).toString();
  350.       t.setPidFile(pidFile);
  351.       tracker.addToMemoryManager(t.getTaskID(), conf, pidFile);
  352.       // set memory limit using ulimit if feasible and necessary ...
  353.       String[] ulimitCmd = Shell.getUlimitMemoryCommand(conf);
  354.       List<String> setup = null;
  355.       if (ulimitCmd != null) {
  356.         setup = new ArrayList<String>();
  357.         for (String arg : ulimitCmd) {
  358.           setup.add(arg);
  359.         }
  360.       }
  361.       // Set up the redirection of the task's stdout and stderr streams
  362.       File stdout = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDOUT);
  363.       File stderr = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDERR);
  364.       stdout.getParentFile().mkdirs();
  365.       tracker.getTaskTrackerInstrumentation().reportTaskLaunch(taskid, stdout, stderr);
  366.       Map<String, String> env = new HashMap<String, String>();
  367.       StringBuffer ldLibraryPath = new StringBuffer();
  368.       ldLibraryPath.append(workDir.toString());
  369.       String oldLdLibraryPath = null;
  370.       oldLdLibraryPath = System.getenv("LD_LIBRARY_PATH");
  371.       if (oldLdLibraryPath != null) {
  372.         ldLibraryPath.append(sep);
  373.         ldLibraryPath.append(oldLdLibraryPath);
  374.       }
  375.       env.put("LD_LIBRARY_PATH", ldLibraryPath.toString());
  376.       jvmManager.launchJvm(this, 
  377.           jvmManager.constructJvmEnv(setup,vargs,stdout,stderr,logSize, 
  378.               workDir, env, pidFile, conf));
  379.       synchronized (lock) {
  380.         while (!done) {
  381.           lock.wait();
  382.         }
  383.       }
  384.       tracker.getTaskTrackerInstrumentation().reportTaskEnd(t.getTaskID());
  385.       if (exitCodeSet) {
  386.         if (!killed && exitCode != 0) {
  387.           if (exitCode == 65) {
  388.             tracker.getTaskTrackerInstrumentation().taskFailedPing(t.getTaskID());
  389.           }
  390.           throw new IOException("Task process exit with nonzero status of " +
  391.               exitCode + ".");
  392.         }
  393.       }
  394.     } catch (FSError e) {
  395.       LOG.fatal("FSError", e);
  396.       try {
  397.         tracker.fsError(t.getTaskID(), e.getMessage());
  398.       } catch (IOException ie) {
  399.         LOG.fatal(t.getTaskID()+" reporting FSError", ie);
  400.       }
  401.     } catch (Throwable throwable) {
  402.       LOG.warn(t.getTaskID()+" Child Error", throwable);
  403.       ByteArrayOutputStream baos = new ByteArrayOutputStream();
  404.       throwable.printStackTrace(new PrintStream(baos));
  405.       try {
  406.         tracker.reportDiagnosticInfo(t.getTaskID(), baos.toString());
  407.       } catch (IOException e) {
  408.         LOG.warn(t.getTaskID()+" Reporting Diagnostics", e);
  409.       }
  410.     } finally {
  411.       try{
  412.         URI[] archives = DistributedCache.getCacheArchives(conf);
  413.         URI[] files = DistributedCache.getCacheFiles(conf);
  414.         if (archives != null){
  415.           for (int i = 0; i < archives.length; i++){
  416.             DistributedCache.releaseCache(archives[i], conf);
  417.           }
  418.         }
  419.         if (files != null){
  420.           for(int i = 0; i < files.length; i++){
  421.             DistributedCache.releaseCache(files[i], conf);
  422.           }
  423.         }
  424.       }catch(IOException ie){
  425.         LOG.warn("Error releasing caches : Cache files might not have been cleaned up");
  426.       }
  427.       tip.reportTaskFinished();
  428.     }
  429.   }
  430.   
  431.   //Mostly for setting up the symlinks. Note that when we setup the distributed
  432.   //cache, we didn't create the symlinks. This is done on a per task basis
  433.   //by the currently executing task.
  434.   public static void setupWorkDir(JobConf conf) throws IOException {
  435.     File workDir = new File(".").getAbsoluteFile();
  436.     FileUtil.fullyDelete(workDir);
  437.     if (DistributedCache.getSymlink(conf)) {
  438.       URI[] archives = DistributedCache.getCacheArchives(conf);
  439.       URI[] files = DistributedCache.getCacheFiles(conf);
  440.       Path[] localArchives = DistributedCache.getLocalCacheArchives(conf);
  441.       Path[] localFiles = DistributedCache.getLocalCacheFiles(conf);
  442.       if (archives != null) {
  443.         for (int i = 0; i < archives.length; i++) {
  444.           String link = archives[i].getFragment();
  445.           if (link != null) {
  446.             link = workDir.toString() + Path.SEPARATOR + link;
  447.             File flink = new File(link);
  448.             if (!flink.exists()) {
  449.               FileUtil.symLink(localArchives[i].toString(), link);
  450.             }
  451.           }
  452.         }
  453.       }
  454.       if (files != null) {
  455.         for (int i = 0; i < files.length; i++) {
  456.           String link = files[i].getFragment();
  457.           if (link != null) {
  458.             link = workDir.toString() + Path.SEPARATOR + link;
  459.             File flink = new File(link);
  460.             if (!flink.exists()) {
  461.               FileUtil.symLink(localFiles[i].toString(), link);
  462.             }
  463.           }
  464.         }
  465.       }
  466.     }
  467.     File jobCacheDir = null;
  468.     if (conf.getJar() != null) {
  469.       jobCacheDir = new File(
  470.           new Path(conf.getJar()).getParent().toString());
  471.     }
  472.     // create symlinks for all the files in job cache dir in current
  473.     // workingdir for streaming
  474.     try{
  475.       DistributedCache.createAllSymlink(conf, jobCacheDir,
  476.           workDir);
  477.     } catch(IOException ie){
  478.       // Do not exit even if symlinks have not been created.
  479.       LOG.warn(StringUtils.stringifyException(ie));
  480.     }
  481.     // add java.io.tmpdir given by mapred.child.tmp
  482.     String tmp = conf.get("mapred.child.tmp", "./tmp");
  483.     Path tmpDir = new Path(tmp);
  484.     // if temp directory path is not absolute
  485.     // prepend it with workDir.
  486.     if (!tmpDir.isAbsolute()) {
  487.       tmpDir = new Path(workDir.toString(), tmp);
  488.       FileSystem localFs = FileSystem.getLocal(conf);
  489.       if (!localFs.mkdirs(tmpDir) && !localFs.getFileStatus(tmpDir).isDir()){
  490.         throw new IOException("Mkdirs failed to create " + tmpDir.toString());
  491.       }
  492.     }
  493.   }
  494.   /**
  495.    * Kill the child process
  496.    */
  497.   public void kill() {
  498.     killed = true;
  499.     jvmManager.taskKilled(this);
  500.     signalDone();
  501.   }
  502.   public void signalDone() {
  503.     synchronized (lock) {
  504.       done = true;
  505.       lock.notify();
  506.     }
  507.   }
  508.   public void setExitCode(int exitCode) {
  509.     this.exitCodeSet = true;
  510.     this.exitCode = exitCode;
  511.   }
  512. }