JvmManager.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:13k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.File;
  20. import java.io.IOException;
  21. import java.util.ArrayList;
  22. import java.util.HashMap;
  23. import java.util.Iterator;
  24. import java.util.List;
  25. import java.util.Map;
  26. import java.util.Random;
  27. import java.util.Vector;
  28. import org.apache.commons.logging.Log;
  29. import org.apache.commons.logging.LogFactory;
  30. import org.apache.hadoop.fs.FileUtil;
  31. import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
  32. import org.apache.hadoop.util.Shell.ShellCommandExecutor;
  33. class JvmManager {
  34.   public static final Log LOG =
  35.     LogFactory.getLog("org.apache.hadoop.mapred.JvmManager");
  36.   JvmManagerForType mapJvmManager;
  37.   JvmManagerForType reduceJvmManager;
  38.   
  39.   public JvmEnv constructJvmEnv(List<String> setup, Vector<String>vargs,
  40.       File stdout,File stderr,long logSize, File workDir, 
  41.       Map<String,String> env, String pidFile, JobConf conf) {
  42.     return new JvmEnv(setup,vargs,stdout,stderr,logSize,workDir,env,pidFile,conf);
  43.   }
  44.   
  45.   public JvmManager(TaskTracker tracker) {
  46.     mapJvmManager = new JvmManagerForType(tracker.getMaxCurrentMapTasks(), 
  47.         true);
  48.     reduceJvmManager = new JvmManagerForType(tracker.getMaxCurrentReduceTasks(),
  49.         false);
  50.   }
  51.   
  52.   public void stop() {
  53.     mapJvmManager.stop();
  54.     reduceJvmManager.stop();
  55.   }
  56.   public boolean isJvmKnown(JVMId jvmId) {
  57.     if (jvmId.isMapJVM()) {
  58.       return mapJvmManager.isJvmknown(jvmId);
  59.     } else {
  60.       return reduceJvmManager.isJvmknown(jvmId);
  61.     }
  62.   }
  63.   public void launchJvm(TaskRunner t, JvmEnv env) {
  64.     if (t.getTask().isMapTask()) {
  65.       mapJvmManager.reapJvm(t, env);
  66.     } else {
  67.       reduceJvmManager.reapJvm(t, env);
  68.     }
  69.   }
  70.   public TaskInProgress getTaskForJvm(JVMId jvmId) {
  71.     if (jvmId.isMapJVM()) {
  72.       return mapJvmManager.getTaskForJvm(jvmId);
  73.     } else {
  74.       return reduceJvmManager.getTaskForJvm(jvmId);
  75.     }
  76.   }
  77.   public void taskFinished(TaskRunner tr) {
  78.     if (tr.getTask().isMapTask()) {
  79.       mapJvmManager.taskFinished(tr);
  80.     } else {
  81.       reduceJvmManager.taskFinished(tr);
  82.     }
  83.   }
  84.   public void taskKilled(TaskRunner tr) {
  85.     if (tr.getTask().isMapTask()) {
  86.       mapJvmManager.taskKilled(tr);
  87.     } else {
  88.       reduceJvmManager.taskKilled(tr);
  89.     }
  90.   }
  91.   public void killJvm(JVMId jvmId) {
  92.     if (jvmId.isMap) {
  93.       mapJvmManager.killJvm(jvmId);
  94.     } else {
  95.       reduceJvmManager.killJvm(jvmId);
  96.     }
  97.   }  
  98.   private static class JvmManagerForType {
  99.     //Mapping from the JVM IDs to running Tasks
  100.     Map <JVMId,TaskRunner> jvmToRunningTask = 
  101.       new HashMap<JVMId, TaskRunner>();
  102.     //Mapping from the tasks to JVM IDs
  103.     Map <TaskRunner,JVMId> runningTaskToJvm = 
  104.       new HashMap<TaskRunner, JVMId>();
  105.     //Mapping from the JVM IDs to Reduce JVM processes
  106.     Map <JVMId, JvmRunner> jvmIdToRunner = 
  107.       new HashMap<JVMId, JvmRunner>();
  108.     int maxJvms;
  109.     boolean isMap;
  110.     
  111.     Random rand = new Random(System.currentTimeMillis());
  112.     public JvmManagerForType(int maxJvms, boolean isMap) {
  113.       this.maxJvms = maxJvms;
  114.       this.isMap = isMap;
  115.     }
  116.     synchronized public void setRunningTaskForJvm(JVMId jvmId, 
  117.         TaskRunner t) {
  118.       jvmToRunningTask.put(jvmId, t);
  119.       runningTaskToJvm.put(t,jvmId);
  120.       jvmIdToRunner.get(jvmId).setBusy(true);
  121.     }
  122.     
  123.     synchronized public TaskInProgress getTaskForJvm(JVMId jvmId) {
  124.       if (jvmToRunningTask.containsKey(jvmId)) {
  125.         return jvmToRunningTask.get(jvmId).getTaskInProgress();
  126.       }
  127.       return null;
  128.     }
  129.     
  130.     synchronized public boolean isJvmknown(JVMId jvmId) {
  131.       return jvmIdToRunner.containsKey(jvmId);
  132.     }
  133.     synchronized public void taskFinished(TaskRunner tr) {
  134.       JVMId jvmId = runningTaskToJvm.remove(tr);
  135.       if (jvmId != null) {
  136.         jvmToRunningTask.remove(jvmId);
  137.         JvmRunner jvmRunner;
  138.         if ((jvmRunner = jvmIdToRunner.get(jvmId)) != null) {
  139.           jvmRunner.taskRan();
  140.         }
  141.       }
  142.     }
  143.     synchronized public void taskKilled(TaskRunner tr) {
  144.       JVMId jvmId = runningTaskToJvm.remove(tr);
  145.       if (jvmId != null) {
  146.         jvmToRunningTask.remove(jvmId);
  147.         killJvm(jvmId);
  148.       }
  149.     }
  150.     synchronized public void killJvm(JVMId jvmId) {
  151.       JvmRunner jvmRunner;
  152.       if ((jvmRunner = jvmIdToRunner.get(jvmId)) != null) {
  153.         jvmRunner.kill();
  154.       }
  155.     }
  156.     
  157.     synchronized public void stop() {
  158.       //since the kill() method invoked later on would remove
  159.       //an entry from the jvmIdToRunner map, we create a
  160.       //copy of the values and iterate over it (if we don't
  161.       //make a copy, we will encounter concurrentModification
  162.       //exception
  163.       List <JvmRunner> list = new ArrayList<JvmRunner>();
  164.       list.addAll(jvmIdToRunner.values());
  165.       for (JvmRunner jvm : list) {
  166.         jvm.kill();
  167.       }
  168.     }
  169.     
  170.     synchronized private void removeJvm(JVMId jvmId) {
  171.       jvmIdToRunner.remove(jvmId);
  172.     }
  173.     private synchronized void reapJvm( 
  174.         TaskRunner t, JvmEnv env) {
  175.       if (t.getTaskInProgress().wasKilled()) {
  176.         //the task was killed in-flight
  177.         //no need to do the rest of the operations
  178.         return;
  179.       }
  180.       boolean spawnNewJvm = false;
  181.       JobID jobId = t.getTask().getJobID();
  182.       //Check whether there is a free slot to start a new JVM.
  183.       //,or, Kill a (idle) JVM and launch a new one
  184.       //When this method is called, we *must* 
  185.       // (1) spawn a new JVM (if we are below the max) 
  186.       // (2) find an idle JVM (that belongs to the same job), or,
  187.       // (3) kill an idle JVM (from a different job) 
  188.       // (the order of return is in the order above)
  189.       int numJvmsSpawned = jvmIdToRunner.size();
  190.       JvmRunner runnerToKill = null;
  191.       if (numJvmsSpawned >= maxJvms) {
  192.         //go through the list of JVMs for all jobs.
  193.         Iterator<Map.Entry<JVMId, JvmRunner>> jvmIter = 
  194.           jvmIdToRunner.entrySet().iterator();
  195.         
  196.         while (jvmIter.hasNext()) {
  197.           JvmRunner jvmRunner = jvmIter.next().getValue();
  198.           JobID jId = jvmRunner.jvmId.getJobId();
  199.           //look for a free JVM for this job; if one exists then just break
  200.           if (jId.equals(jobId) && !jvmRunner.isBusy() && !jvmRunner.ranAll()){
  201.             setRunningTaskForJvm(jvmRunner.jvmId, t); //reserve the JVM
  202.             LOG.info("No new JVM spawned for jobId/taskid: " + 
  203.                      jobId+"/"+t.getTask().getTaskID() +
  204.                      ". Attempting to reuse: " + jvmRunner.jvmId);
  205.             return;
  206.           }
  207.           //Cases when a JVM is killed: 
  208.           // (1) the JVM under consideration belongs to the same job 
  209.           //     (passed in the argument). In this case, kill only when
  210.           //     the JVM ran all the tasks it was scheduled to run (in terms
  211.           //     of count).
  212.           // (2) the JVM under consideration belongs to a different job and is
  213.           //     currently not busy
  214.           //But in both the above cases, we see if we can assign the current
  215.           //task to an idle JVM (hence we continue the loop even on a match)
  216.           if ((jId.equals(jobId) && jvmRunner.ranAll()) ||
  217.               (!jId.equals(jobId) && !jvmRunner.isBusy())) {
  218.             runnerToKill = jvmRunner;
  219.             spawnNewJvm = true;
  220.           }
  221.         }
  222.       } else {
  223.         spawnNewJvm = true;
  224.       }
  225.       if (spawnNewJvm) {
  226.         if (runnerToKill != null) {
  227.           LOG.info("Killing JVM: " + runnerToKill.jvmId);
  228.           runnerToKill.kill();
  229.         }
  230.         spawnNewJvm(jobId, env, t);
  231.         return;
  232.       }
  233.       //*MUST* never reach this
  234.       throw new RuntimeException("Inconsistent state!!! " +
  235.        "JVM Manager reached an unstable state " +
  236.             "while reaping a JVM for task: " + t.getTask().getTaskID()+
  237.             " " + getDetails());
  238.     }
  239.     
  240.     private String getDetails() {
  241.       StringBuffer details = new StringBuffer();
  242.       details.append("Number of active JVMs:").
  243.               append(jvmIdToRunner.size());
  244.       Iterator<JVMId> jvmIter = 
  245.         jvmIdToRunner.keySet().iterator();
  246.       while (jvmIter.hasNext()) {
  247.         JVMId jvmId = jvmIter.next();
  248.         details.append("n  JVMId ").
  249.           append(jvmId.toString()).
  250.           append(" #Tasks ran: "). 
  251.           append(jvmIdToRunner.get(jvmId).numTasksRan).
  252.           append(" Currently busy? ").
  253.           append(jvmIdToRunner.get(jvmId).busy).
  254.           append(" Currently running: "). 
  255.           append(jvmToRunningTask.get(jvmId).getTask().getTaskID().toString());
  256.       }
  257.       return details.toString();
  258.     }
  259.     private void spawnNewJvm(JobID jobId, JvmEnv env,  
  260.         TaskRunner t) {
  261.       JvmRunner jvmRunner = new JvmRunner(env,jobId);
  262.       jvmIdToRunner.put(jvmRunner.jvmId, jvmRunner);
  263.       //spawn the JVM in a new thread. Note that there will be very little
  264.       //extra overhead of launching the new thread for a new JVM since
  265.       //most of the cost is involved in launching the process. Moreover,
  266.       //since we are going to be using the JVM for running many tasks,
  267.       //the thread launch cost becomes trivial when amortized over all
  268.       //tasks. Doing it this way also keeps code simple.
  269.       jvmRunner.setDaemon(true);
  270.       jvmRunner.setName("JVM Runner " + jvmRunner.jvmId + " spawned.");
  271.       setRunningTaskForJvm(jvmRunner.jvmId, t);
  272.       LOG.info(jvmRunner.getName());
  273.       jvmRunner.start();
  274.     }
  275.     synchronized private void updateOnJvmExit(JVMId jvmId, 
  276.         int exitCode, boolean killed) {
  277.       removeJvm(jvmId);
  278.       TaskRunner t = jvmToRunningTask.remove(jvmId);
  279.       if (t != null) {
  280.         runningTaskToJvm.remove(t);
  281.         if (!killed && exitCode != 0) {
  282.           t.setExitCode(exitCode);
  283.         }
  284.         t.signalDone();
  285.       }
  286.     }
  287.     private class JvmRunner extends Thread {
  288.       JvmEnv env;
  289.       volatile boolean killed = false;
  290.       volatile int numTasksRan;
  291.       final int numTasksToRun;
  292.       JVMId jvmId;
  293.       volatile boolean busy = true;
  294.       private ShellCommandExecutor shexec; // shell terminal for running the task
  295.       public JvmRunner(JvmEnv env, JobID jobId) {
  296.         this.env = env;
  297.         this.jvmId = new JVMId(jobId, isMap, rand.nextInt());
  298.         this.numTasksToRun = env.conf.getNumTasksToExecutePerJvm();
  299.         LOG.info("In JvmRunner constructed JVM ID: " + jvmId);
  300.       }
  301.       public void run() {
  302.         runChild(env);
  303.       }
  304.       public void runChild(JvmEnv env) {
  305.         try {
  306.           env.vargs.add(Integer.toString(jvmId.getId()));
  307.           List<String> wrappedCommand = 
  308.             TaskLog.captureOutAndError(env.setup, env.vargs, env.stdout, env.stderr,
  309.                 env.logSize, env.pidFile);
  310.           shexec = new ShellCommandExecutor(wrappedCommand.toArray(new String[0]), 
  311.               env.workDir, env.env);
  312.           shexec.execute();
  313.         } catch (IOException ioe) {
  314.           // do nothing
  315.           // error and output are appropriately redirected
  316.         } finally { // handle the exit code
  317.           if (shexec == null) {
  318.             return;
  319.           }
  320.           int exitCode = shexec.getExitCode();
  321.           updateOnJvmExit(jvmId, exitCode, killed);
  322.           LOG.info("JVM : " + jvmId +" exited. Number of tasks it ran: " + 
  323.               numTasksRan);
  324.           try {
  325.             // In case of jvm-reuse,
  326.             //the task jvm cleans up the common workdir for every 
  327.             //task at the beginning of each task in the task JVM.
  328.             //For the last task, we do it here.
  329.             if (env.conf.getNumTasksToExecutePerJvm() != 1) {
  330.               FileUtil.fullyDelete(env.workDir);
  331.             }
  332.           } catch (IOException ie){}
  333.         }
  334.       }
  335.       public void kill() {
  336.         if (shexec != null) {
  337.           Process process = shexec.getProcess();
  338.           if (process != null) {
  339.             process.destroy();
  340.           }
  341.         }
  342.         removeJvm(jvmId);
  343.       }
  344.       
  345.       public void taskRan() {
  346.         busy = false;
  347.         numTasksRan++;
  348.       }
  349.       
  350.       public boolean ranAll() {
  351.         return(numTasksRan == numTasksToRun);
  352.       }
  353.       public void setBusy(boolean busy) {
  354.         this.busy = busy;
  355.       }
  356.       public boolean isBusy() {
  357.         return busy;
  358.       }
  359.     }
  360.   }  
  361.   static class JvmEnv { //Helper class
  362.     List<String> vargs;
  363.     List<String> setup;
  364.     File stdout;
  365.     File stderr;
  366.     File workDir;
  367.     String pidFile;
  368.     long logSize;
  369.     JobConf conf;
  370.     Map<String, String> env;
  371.     public JvmEnv(List<String> setup, Vector<String> vargs, File stdout, 
  372.         File stderr, long logSize, File workDir, Map<String,String> env,
  373.         String pidFile, JobConf conf) {
  374.       this.setup = setup;
  375.       this.vargs = vargs;
  376.       this.stdout = stdout;
  377.       this.stderr = stderr;
  378.       this.workDir = workDir;
  379.       this.env = env;
  380.       this.pidFile = pidFile;
  381.       this.conf = conf;
  382.     }
  383.   }
  384. }