EagerTaskInitializationListener.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:6k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.IOException;
  20. import java.util.ArrayList;
  21. import java.util.Collections;
  22. import java.util.Comparator;
  23. import java.util.List;
  24. import java.util.concurrent.ExecutorService;
  25. import java.util.concurrent.Executors;
  26. import org.apache.commons.logging.Log;
  27. import org.apache.commons.logging.LogFactory;
  28. import org.apache.hadoop.conf.Configuration;
  29. import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
  30. import org.apache.hadoop.util.StringUtils;
  31. /**
  32.  * A {@link JobInProgressListener} which initializes the tasks for a job as soon
  33.  * as the job is added (using the {@link #jobAdded(JobInProgress)} method).
  34.  */
  35. class EagerTaskInitializationListener extends JobInProgressListener {
  36.   
  37.   private static final int DEFAULT_NUM_THREADS = 4;
  38.   private static final Log LOG = LogFactory.getLog(
  39.       EagerTaskInitializationListener.class.getName());
  40.   
  41.   /////////////////////////////////////////////////////////////////
  42.   //  Used to init new jobs that have just been created
  43.   /////////////////////////////////////////////////////////////////
  44.   class JobInitManager implements Runnable {
  45.    
  46.     public void run() {
  47.       JobInProgress job = null;
  48.       while (true) {
  49.         try {
  50.           synchronized (jobInitQueue) {
  51.             while (jobInitQueue.isEmpty()) {
  52.               jobInitQueue.wait();
  53.             }
  54.             job = jobInitQueue.remove(0);
  55.           }
  56.           threadPool.execute(new InitJob(job));
  57.         } catch (InterruptedException t) {
  58.           LOG.info("JobInitManagerThread interrupted.");
  59.           break;
  60.         } 
  61.       }
  62.       LOG.info("Shutting down thread pool");
  63.       threadPool.shutdownNow();
  64.     }
  65.   }
  66.   
  67.   static class InitJob implements Runnable {
  68.   
  69.     private JobInProgress job;
  70.     
  71.     public InitJob(JobInProgress job) {
  72.       this.job = job;
  73.     }
  74.     
  75.     public void run() {
  76.       try {
  77.         LOG.info("Initializing " + job.getJobID());
  78.         job.initTasks();
  79.       } catch (Throwable t) {
  80.         LOG.error("Job initialization failed:n" +
  81.             StringUtils.stringifyException(t));
  82.         if (job != null) {
  83.           job.fail();
  84.         }
  85.       }
  86.     }
  87.   }
  88.   
  89.   private JobInitManager jobInitManager = new JobInitManager();
  90.   private Thread jobInitManagerThread;
  91.   private List<JobInProgress> jobInitQueue = new ArrayList<JobInProgress>();
  92.   private ExecutorService threadPool;
  93.   private int numThreads;
  94.   
  95.   public EagerTaskInitializationListener(Configuration conf) {
  96.     numThreads = conf.getInt("mapred.jobinit.threads", DEFAULT_NUM_THREADS);
  97.     threadPool = Executors.newFixedThreadPool(numThreads);
  98.   }
  99.   
  100.   public void start() throws IOException {
  101.     this.jobInitManagerThread = new Thread(jobInitManager, "jobInitManager");
  102.     jobInitManagerThread.setDaemon(true);
  103.     this.jobInitManagerThread.start();
  104.   }
  105.   
  106.   public void terminate() throws IOException {
  107.     if (jobInitManagerThread != null && jobInitManagerThread.isAlive()) {
  108.       LOG.info("Stopping Job Init Manager thread");
  109.       jobInitManagerThread.interrupt();
  110.       try {
  111.         jobInitManagerThread.join();
  112.       } catch (InterruptedException ex) {
  113.         ex.printStackTrace();
  114.       }
  115.     }
  116.   }
  117.   /**
  118.    * We add the JIP to the jobInitQueue, which is processed 
  119.    * asynchronously to handle split-computation and build up
  120.    * the right TaskTracker/Block mapping.
  121.    */
  122.   @Override
  123.   public void jobAdded(JobInProgress job) {
  124.     synchronized (jobInitQueue) {
  125.       jobInitQueue.add(job);
  126.       resortInitQueue();
  127.       jobInitQueue.notifyAll();
  128.     }
  129.   }
  130.   
  131.   /**
  132.    * Sort jobs by priority and then by start time.
  133.    */
  134.   private synchronized void resortInitQueue() {
  135.     Comparator<JobInProgress> comp = new Comparator<JobInProgress>() {
  136.       public int compare(JobInProgress o1, JobInProgress o2) {
  137.         int res = o1.getPriority().compareTo(o2.getPriority());
  138.         if(res == 0) {
  139.           if(o1.getStartTime() < o2.getStartTime())
  140.             res = -1;
  141.           else
  142.             res = (o1.getStartTime()==o2.getStartTime() ? 0 : 1);
  143.         }
  144.           
  145.         return res;
  146.       }
  147.     };
  148.     
  149.     synchronized (jobInitQueue) {
  150.       Collections.sort(jobInitQueue, comp);
  151.     }
  152.   }
  153.   @Override
  154.   public void jobRemoved(JobInProgress job) {
  155.     synchronized (jobInitQueue) {
  156.       jobInitQueue.remove(job);
  157.     }
  158.   }
  159.   @Override
  160.   public void jobUpdated(JobChangeEvent event) {
  161.     if (event instanceof JobStatusChangeEvent) {
  162.       jobStateChanged((JobStatusChangeEvent)event);
  163.     }
  164.   }
  165.   
  166.   // called when the job's status is changed
  167.   private void jobStateChanged(JobStatusChangeEvent event) {
  168.     // Resort the job queue if the job-start-time or job-priority changes
  169.     if (event.getEventType() == EventType.START_TIME_CHANGED
  170.         || event.getEventType() == EventType.PRIORITY_CHANGED) {
  171.       synchronized (jobInitQueue) {
  172.         resortInitQueue();
  173.       }
  174.     }
  175.   }
  176. }