JobInitializationPoller.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:21k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.util.ArrayList;
  20. import java.util.Collection;
  21. import java.util.HashMap;
  22. import java.util.HashSet;
  23. import java.util.Iterator;
  24. import java.util.Set;
  25. import java.util.TreeMap;
  26. import java.util.Map.Entry;
  27. import java.util.concurrent.atomic.AtomicInteger;
  28. import org.apache.commons.logging.Log;
  29. import org.apache.commons.logging.LogFactory;
  30. import org.apache.hadoop.mapred.JobQueueJobInProgressListener.JobSchedulingInfo;
  31. import org.apache.hadoop.util.StringUtils;
  32. /**
  33.  * This class asynchronously initializes jobs submitted to the
  34.  * Map/Reduce cluster running with the {@link CapacityTaskScheduler}.
  35.  *
  36.  * <p>
  37.  * The class comprises of a main poller thread, and a set of worker
  38.  * threads that together initialize the jobs. The poller thread periodically
  39.  * looks at jobs submitted to the scheduler, and selects a set of them
  40.  * to be initialized. It passes these to the worker threads for initializing.
  41.  * Each worker thread is configured to look at jobs submitted to a fixed
  42.  * set of queues. It initializes jobs in a round robin manner - selecting
  43.  * the first job in order from each queue ready to be initialized.
  44.  * </p>
  45.  * 
  46.  * <p>
  47.  * An initialized job occupies memory resources on the Job Tracker. Hence,
  48.  * the poller limits the number of jobs initialized at any given time to
  49.  * a configured limit. The limit is specified per user per queue.
  50.  * </p>
  51.  * 
  52.  * <p>
  53.  * However, since a job needs to be initialized before the scheduler can
  54.  * select tasks from it to run, it tries to keep a backlog of jobs 
  55.  * initialized so the scheduler does not need to wait and let empty slots
  56.  * go waste. The core logic of the poller is to pick up the right jobs,
  57.  * which have a good potential to be run next by the scheduler. To do this,
  58.  * it picks up jobs submitted across users and across queues to account
  59.  * both for guaranteed capacities and user limits. It also always initializes
  60.  * high priority jobs, whenever they need to be initialized, even if this
  61.  * means going over the limit for initialized jobs.
  62.  * </p>
  63.  */
  64. public class JobInitializationPoller extends Thread {
  65.   private static final Log LOG = LogFactory
  66.       .getLog(JobInitializationPoller.class.getName());
  67.   /*
  68.    * The poller picks up jobs across users to initialize based on user limits.
  69.    * Suppose the user limit for a queue is 25%, it means atmost 4 users' jobs
  70.    * can run together. However, in order to account for jobs from a user that
  71.    * might complete faster than others, it initializes jobs from an additional
  72.    * number of users as a backlog. This variable defines the additional
  73.    * number of users whose jobs can be considered for initializing. 
  74.    */
  75.   private static final int MAX_ADDITIONAL_USERS_TO_INIT = 2;
  76.   private JobQueuesManager jobQueueManager;
  77.   private long sleepInterval;
  78.   private int poolSize;
  79.   /**
  80.    * A worker thread that initializes jobs in one or more queues assigned to
  81.    * it.
  82.    *
  83.    * Jobs are initialized in a round robin fashion one from each queue at a
  84.    * time.
  85.    */
  86.   class JobInitializationThread extends Thread {
  87.     private JobInProgress initializingJob;
  88.     private volatile boolean startIniting;
  89.     private AtomicInteger currentJobCount = new AtomicInteger(0); // number of jobs to initialize
  90.     /**
  91.      * The hash map which maintains relationship between queue to jobs to
  92.      * initialize per queue.
  93.      */
  94.     private HashMap<String, TreeMap<JobSchedulingInfo, JobInProgress>> jobsPerQueue;
  95.     public JobInitializationThread() {
  96.       startIniting = true;
  97.       jobsPerQueue = new HashMap<String, TreeMap<JobSchedulingInfo, JobInProgress>>();
  98.     }
  99.     @Override
  100.     public void run() {
  101.       while (startIniting) {
  102.         initializeJobs();  
  103.         try {
  104.           if (startIniting) {
  105.             Thread.sleep(sleepInterval);
  106.           } else {
  107.             break;
  108.           }
  109.         } catch (Throwable t) {
  110.         }
  111.       }
  112.     }
  113.     // The key method that initializes jobs from queues
  114.     // This method is package-private to allow test cases to call it
  115.     // synchronously in a controlled manner.
  116.     void initializeJobs() {
  117.       // while there are more jobs to initialize...
  118.       while (currentJobCount.get() > 0) {
  119.         Set<String> queues = jobsPerQueue.keySet();
  120.         for (String queue : queues) {
  121.           JobInProgress job = getFirstJobInQueue(queue);
  122.           if (job == null) {
  123.             continue;
  124.           }
  125.           LOG.info("Initializing job : " + job.getJobID() + " in Queue "
  126.               + job.getProfile().getQueueName() + " For user : "
  127.               + job.getProfile().getUser());
  128.           try {
  129.             if (startIniting) {
  130.               setInitializingJob(job);
  131.               job.initTasks();
  132.               setInitializingJob(null);
  133.             } else {
  134.               break;
  135.             }
  136.           } catch (Throwable t) {
  137.             LOG.info("Job initialization failed:n"
  138.                 + StringUtils.stringifyException(t));
  139.             if (job != null)
  140.               job.fail();
  141.           }
  142.         }
  143.       }
  144.     }
  145.     /**
  146.      * This method returns the first job in the queue and removes the same.
  147.      * 
  148.      * @param queue
  149.      *          queue name
  150.      * @return First job in the queue and removes it.
  151.      */
  152.     private JobInProgress getFirstJobInQueue(String queue) {
  153.       TreeMap<JobSchedulingInfo, JobInProgress> jobsList = jobsPerQueue
  154.           .get(queue);
  155.       synchronized (jobsList) {
  156.         if (jobsList.isEmpty()) {
  157.           return null;
  158.         }
  159.         Iterator<JobInProgress> jobIterator = jobsList.values().iterator();
  160.         JobInProgress job = jobIterator.next();
  161.         jobIterator.remove();
  162.         currentJobCount.getAndDecrement();
  163.         return job;
  164.       }
  165.     }
  166.     /*
  167.      * Test method to check if the thread is currently initialising the job
  168.      */
  169.     synchronized JobInProgress getInitializingJob() {
  170.       return this.initializingJob;
  171.     }
  172.     
  173.     synchronized void setInitializingJob(JobInProgress job) {
  174.       this.initializingJob  = job;
  175.     }
  176.     void terminate() {
  177.       startIniting = false;
  178.     }
  179.     void addJobsToQueue(String queue, JobInProgress job) {
  180.       TreeMap<JobSchedulingInfo, JobInProgress> jobs = jobsPerQueue
  181.           .get(queue);
  182.       if (jobs == null) {
  183.         LOG.error("Invalid queue passed to the thread : " + queue
  184.             + " For job :: " + job.getJobID());
  185.       }
  186.       synchronized (jobs) {
  187.         JobSchedulingInfo schedInfo = new JobSchedulingInfo(job);
  188.         jobs.put(schedInfo, job);
  189.         currentJobCount.getAndIncrement();
  190.       }
  191.     }
  192.     void addQueue(String queue) {
  193.       TreeMap<JobSchedulingInfo, JobInProgress> jobs = new TreeMap<JobSchedulingInfo, JobInProgress>(
  194.           jobQueueManager.getComparator(queue));
  195.       jobsPerQueue.put(queue, jobs);
  196.     }
  197.   }
  198.   /**
  199.    * The queue information class maintains following information per queue:
  200.    * Maximum users allowed to initialize job in the particular queue. Maximum
  201.    * jobs allowed to be initialize per user in the queue.
  202.    * 
  203.    */
  204.   private class QueueInfo {
  205.     String queue;
  206.     int maxUsersAllowedToInitialize;
  207.     int maxJobsPerUserToInitialize;
  208.     public QueueInfo(String queue, int maxUsersAllowedToInitialize,
  209.         int maxJobsPerUserToInitialize) {
  210.       this.queue = queue;
  211.       this.maxJobsPerUserToInitialize = maxJobsPerUserToInitialize;
  212.       this.maxUsersAllowedToInitialize = maxUsersAllowedToInitialize;
  213.     }
  214.   }
  215.   /**
  216.    * Map which contains the configuration used for initializing jobs
  217.    * in that associated to a particular job queue.
  218.    */
  219.   private HashMap<String, QueueInfo> jobQueues;
  220.   /**
  221.    * Set of jobs which have been passed to Initialization threads.
  222.    * This is maintained so that we dont call initTasks() for same job twice.
  223.    */
  224.   private HashMap<JobID,JobInProgress> initializedJobs;
  225.   private volatile boolean running;
  226.   /**
  227.    * The map which provides information which thread should be used to
  228.    * initialize jobs for a given job queue.
  229.    */
  230.   private HashMap<String, JobInitializationThread> threadsToQueueMap;
  231.   public JobInitializationPoller(JobQueuesManager mgr,
  232.       CapacitySchedulerConf rmConf, Set<String> queue) {
  233.     initializedJobs = new HashMap<JobID,JobInProgress>();
  234.     jobQueues = new HashMap<String, QueueInfo>();
  235.     this.jobQueueManager = mgr;
  236.     threadsToQueueMap = new HashMap<String, JobInitializationThread>();
  237.     super.setName("JobInitializationPollerThread");
  238.     running = true;
  239.   }
  240.   /*
  241.    * method to read all configuration values required by the initialisation
  242.    * poller
  243.    */
  244.   void init(Set<String> queues, CapacitySchedulerConf capacityConf) {
  245.     for (String queue : queues) {
  246.       int userlimit = capacityConf.getMinimumUserLimitPercent(queue);
  247.       int maxUsersToInitialize = ((100 / userlimit) + MAX_ADDITIONAL_USERS_TO_INIT);
  248.       int maxJobsPerUserToInitialize = capacityConf
  249.           .getMaxJobsPerUserToInitialize(queue);
  250.       QueueInfo qi = new QueueInfo(queue, maxUsersToInitialize,
  251.           maxJobsPerUserToInitialize);
  252.       jobQueues.put(queue, qi);
  253.     }
  254.     sleepInterval = capacityConf.getSleepInterval();
  255.     poolSize = capacityConf.getMaxWorkerThreads();
  256.     if (poolSize > queues.size()) {
  257.       poolSize = queues.size();
  258.     }
  259.     assignThreadsToQueues();
  260.     Collection<JobInitializationThread> threads = threadsToQueueMap.values();
  261.     for (JobInitializationThread t : threads) {
  262.       if (!t.isAlive()) {
  263.         t.setDaemon(true);
  264.         t.start();
  265.       }
  266.     }
  267.   }
  268.   /**
  269.    * This is main thread of initialization poller, We essentially do 
  270.    * following in the main threads:
  271.    * 
  272.    * <ol>
  273.    * <li> Clean up the list of initialized jobs list which poller maintains
  274.    * </li>
  275.    * <li> Select jobs to initialize in the polling interval.</li>
  276.    * </ol>
  277.    */
  278.   public void run() {
  279.     while (running) {
  280.       try {
  281.         cleanUpInitializedJobsList();
  282.         selectJobsToInitialize();
  283.         if (!this.isInterrupted()) {
  284.           Thread.sleep(sleepInterval);
  285.         }
  286.       } catch (InterruptedException e) {
  287.         LOG.error("Job Initialization poller interrupted"
  288.             + StringUtils.stringifyException(e));
  289.       }
  290.     }
  291.   }
  292.   /**
  293.    * The key method which does selecting jobs to be initalized across 
  294.    * queues and assign those jobs to their appropriate init-worker threads.
  295.    * <br/>
  296.    * This method is overriden in test case which is used to test job
  297.    * initialization poller.
  298.    * 
  299.    */
  300.   void selectJobsToInitialize() {
  301.     for (String queue : jobQueues.keySet()) {
  302.       ArrayList<JobInProgress> jobsToInitialize = getJobsToInitialize(queue);
  303.       printJobs(jobsToInitialize);
  304.       JobInitializationThread t = threadsToQueueMap.get(queue);
  305.       for (JobInProgress job : jobsToInitialize) {
  306.         t.addJobsToQueue(queue, job);
  307.       }
  308.     }
  309.   }
  310.   /**
  311.    * Method used to print log statements about which jobs are being
  312.    * passed to init-threads. 
  313.    * 
  314.    * @param jobsToInitialize list of jobs which are passed to be 
  315.    * init-threads.
  316.    */
  317.   private void printJobs(ArrayList<JobInProgress> jobsToInitialize) {
  318.     for (JobInProgress job : jobsToInitialize) {
  319.       LOG.info("Passing to Initializer Job Id :" + job.getJobID()
  320.           + " User: " + job.getProfile().getUser() + " Queue : "
  321.           + job.getProfile().getQueueName());
  322.     }
  323.   }
  324.   /**
  325.    * This method exists to be overridden by test cases that wish to
  326.    * create a test-friendly worker thread which can be controlled
  327.    * synchronously.
  328.    * 
  329.    * @return Instance of worker init-threads.
  330.    */
  331.   JobInitializationThread createJobInitializationThread() {
  332.     return new JobInitializationThread();
  333.   }
  334.   
  335.   /**
  336.    * Method which is used by the poller to assign appropriate worker thread
  337.    * to a queue. The number of threads would be always less than or equal
  338.    * to number of queues in a system. If number of threads is configured to 
  339.    * be more than number of queues then poller does not create threads more
  340.    * than number of queues. 
  341.    * 
  342.    */
  343.   private void assignThreadsToQueues() {
  344.     int countOfQueues = jobQueues.size();
  345.     String[] queues = (String[]) jobQueues.keySet().toArray(
  346.         new String[countOfQueues]);
  347.     int numberOfQueuesPerThread = countOfQueues / poolSize;
  348.     int numberOfQueuesAssigned = 0;
  349.     for (int i = 0; i < poolSize; i++) {
  350.       JobInitializationThread initializer = createJobInitializationThread();
  351.       int batch = (i * numberOfQueuesPerThread);
  352.       for (int j = batch; j < (batch + numberOfQueuesPerThread); j++) {
  353.         initializer.addQueue(queues[j]);
  354.         threadsToQueueMap.put(queues[j], initializer);
  355.         numberOfQueuesAssigned++;
  356.       }
  357.     }
  358.     if (numberOfQueuesAssigned < countOfQueues) {
  359.       // Assign remaining queues in round robin fashion to other queues
  360.       int startIndex = 0;
  361.       for (int i = numberOfQueuesAssigned; i < countOfQueues; i++) {
  362.         JobInitializationThread t = threadsToQueueMap
  363.             .get(queues[startIndex]);
  364.         t.addQueue(queues[i]);
  365.         threadsToQueueMap.put(queues[i], t);
  366.         startIndex++;
  367.       }
  368.     }
  369.   }
  370.   /**
  371.    * 
  372.    * Method used to select jobs to be initialized for a given queue. <br/>
  373.    * 
  374.    * We want to ensure that enough jobs have been initialized, so that when the
  375.    * Scheduler wants to consider a new job to run, it's ready. We clearly don't
  376.    * want to initialize too many jobs as each initialized job has a memory
  377.    * footprint, sometimes significant.
  378.    * 
  379.    * Number of jobs to be initialized is restricted by two values: - Maximum
  380.    * number of users whose jobs we want to initialize, which is equal to 
  381.    * the number of concurrent users the queue can support. - Maximum number 
  382.    * of initialized jobs per user. The product of these two gives us the
  383.    * total number of initialized jobs.
  384.    * 
  385.    * Note that this is a rough number, meant for decreasing extra memory
  386.    * footprint. It's OK if we go over it once in a while, if we have to.
  387.    * 
  388.    * This can happen as follows. Suppose we have initialized 3 jobs for a
  389.    * user. Now, suppose the user submits a job who's priority is higher than
  390.    * that of the 3 jobs initialized. This job needs to be initialized, since it
  391.    * will run earlier than the 3 jobs. We'll now have 4 initialized jobs for the
  392.    * user. If memory becomes a problem, we should ideally un-initialize one of
  393.    * the 3 jobs, to keep the count of initialized jobs at 3, but that's
  394.    * something we don't do for now. This situation can also arise when a new
  395.    * user submits a high priority job, thus superceeding a user whose jobs have
  396.    * already been initialized. The latter user's initialized jobs are redundant,
  397.    * but we'll leave them initialized.
  398.    * 
  399.    * @param queue name of the queue to pick the jobs to initialize.
  400.    * @return list of jobs to be initalized in a queue. An empty queue is
  401.    *         returned if no jobs are found.
  402.    */
  403.   ArrayList<JobInProgress> getJobsToInitialize(String queue) {
  404.     QueueInfo qi = jobQueues.get(queue);
  405.     ArrayList<JobInProgress> jobsToInitialize = new ArrayList<JobInProgress>();
  406.     // use the configuration parameter which is configured for the particular
  407.     // queue.
  408.     int maximumUsersAllowedToInitialize = qi.maxUsersAllowedToInitialize;
  409.     int maxJobsPerUserAllowedToInitialize = qi.maxJobsPerUserToInitialize;
  410.     int maxJobsPerQueueToInitialize = maximumUsersAllowedToInitialize
  411.         * maxJobsPerUserAllowedToInitialize;
  412.     int countOfJobsInitialized = 0;
  413.     HashMap<String, Integer> userJobsInitialized = new HashMap<String, Integer>();
  414.     Collection<JobInProgress> jobs = jobQueueManager.getWaitingJobs(queue);
  415.     /*
  416.      * Walk through the collection of waiting jobs.
  417.      *  We maintain a map of jobs that have already been initialized. If a 
  418.      *  job exists in that map, increment the count for that job's user 
  419.      *  and move on to the next job.
  420.      *   
  421.      *  If the job doesn't exist, see whether we  want to initialize it. 
  422.      *  We initialize it if: - at least one job of the user has already 
  423.      *  been initialized, but the user's total initialized jobs are below 
  424.      *  the limit, OR - this is a new user, and we haven't reached the limit
  425.      *  for the number of users whose jobs we want to initialize. We break 
  426.      *  when we've reached the limit of maximum jobs to initialize.
  427.      */
  428.     for (JobInProgress job : jobs) {
  429.       String user = job.getProfile().getUser();
  430.       int numberOfJobs = userJobsInitialized.get(user) == null ? 0
  431.           : userJobsInitialized.get(user);
  432.       // If the job is already initialized then add the count against user
  433.       // then continue.
  434.       if (initializedJobs.containsKey(job.getJobID())) {
  435.         userJobsInitialized.put(user, Integer.valueOf(numberOfJobs + 1));
  436.         countOfJobsInitialized++;
  437.         continue;
  438.       }
  439.       boolean isUserPresent = userJobsInitialized.containsKey(user);
  440.       if (!isUserPresent
  441.           && userJobsInitialized.size() < maximumUsersAllowedToInitialize) {
  442.         // this is a new user being considered and the number of users
  443.         // is within limits.
  444.         userJobsInitialized.put(user, Integer.valueOf(numberOfJobs + 1));
  445.         jobsToInitialize.add(job);
  446.         initializedJobs.put(job.getJobID(),job);
  447.         countOfJobsInitialized++;
  448.       } else if (isUserPresent
  449.           && numberOfJobs < maxJobsPerUserAllowedToInitialize) {
  450.         userJobsInitialized.put(user, Integer.valueOf(numberOfJobs + 1));
  451.         jobsToInitialize.add(job);
  452.         initializedJobs.put(job.getJobID(),job);
  453.         countOfJobsInitialized++;
  454.       }
  455.       /*
  456.        * if the maximum number of jobs to initalize for a queue is reached
  457.        * then we stop looking at further jobs. The jobs beyond this number
  458.        * can be initialized.
  459.        */
  460.       if(countOfJobsInitialized > maxJobsPerQueueToInitialize) {
  461.         break;
  462.       }
  463.     }
  464.     return jobsToInitialize;
  465.   }
  466.   /**
  467.    * Method which is used internally to clean up the initialized jobs
  468.    * data structure which the job initialization poller uses to check
  469.    * if a job is initalized or not.
  470.    * 
  471.    * Algorithm for cleaning up task is as follows:
  472.    * 
  473.    * <ul>
  474.    * <li> For jobs in <b>initalizedJobs</b> list </li>
  475.    * <ul>
  476.    * <li> If job is running</li>
  477.    * <ul>
  478.    * <li> If job is scheduled then remove the job from the waiting queue 
  479.    * of the scheduler and <b>initalizedJobs</b>.<br/>
  480.    *  The check for a job is scheduled or not is done by following 
  481.    *  formulae:<br/> 
  482.    *  if pending <i>task</i> &lt; desired <i>task</i> then scheduled else
  483.    *  not scheduled.<br/>
  484.    *  The formulae would return <i>scheduled</i> if one task has run or failed,
  485.    *  any cases in which there has been a failure but not enough to mark task 
  486.    *  as failed, we return <i>not scheduled</i> in formulae.
  487.    * </li>
  488.    * </ul>
  489.    * 
  490.    * <li> If job is complete, then remove the job from <b>initalizedJobs</b>.
  491.    * </li>
  492.    * 
  493.    * </ul>
  494.    * </ul>
  495.    * 
  496.    */
  497.   void cleanUpInitializedJobsList() {
  498.     Iterator<Entry<JobID, JobInProgress>> jobsIterator = 
  499.       initializedJobs.entrySet().iterator();
  500.     while(jobsIterator.hasNext()) {
  501.       Entry<JobID,JobInProgress> entry = jobsIterator.next();
  502.       JobInProgress job = entry.getValue();
  503.       if (job.getStatus().getRunState() == JobStatus.RUNNING) {
  504.         if (isScheduled(job)) {
  505.           LOG.info("Removing scheduled jobs from waiting queue"
  506.               + job.getJobID());
  507.           jobsIterator.remove();
  508.           jobQueueManager.removeJobFromWaitingQueue(job);
  509.           continue;
  510.         }
  511.       }
  512.       if(job.isComplete()) {
  513.         LOG.info("Removing killed/completed job from initalized jobs " +
  514.          "list : "+ job.getJobID());
  515.         jobsIterator.remove();
  516.       }
  517.     }
  518.   }
  519.   /**
  520.    * Convenience method to check if job has been scheduled or not.
  521.    * 
  522.    * The method may return false in case of job which has failure but
  523.    * has not failed the tip.
  524.    * @param job
  525.    * @return
  526.    */
  527.   private boolean isScheduled(JobInProgress job) {
  528.     return ((job.pendingMaps() < job.desiredMaps()) 
  529.         || (job.pendingReduces() < job.desiredReduces()));
  530.   }
  531.   void terminate() {
  532.     running = false;
  533.     for (Entry<String, JobInitializationThread> entry : threadsToQueueMap
  534.         .entrySet()) {
  535.       JobInitializationThread t = entry.getValue();
  536.       if (t.isAlive()) {
  537.         t.terminate();
  538.         t.interrupt();
  539.       }
  540.     }
  541.   }
  542.   /*
  543.    * Test method used only for testing purposes.
  544.    */
  545.   JobInProgress getInitializingJob(String queue) {
  546.     JobInitializationThread t = threadsToQueueMap.get(queue);
  547.     if (t == null) {
  548.       return null;
  549.     } else {
  550.       return t.getInitializingJob();
  551.     }
  552.   }
  553.   Set<JobID> getInitializedJobList() {
  554.     return initializedJobs.keySet();
  555.   }
  556. }