LocalJobRunner.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:16k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.IOException;
  20. import java.util.ArrayList;
  21. import java.util.HashMap;
  22. import java.util.List;
  23. import org.apache.commons.logging.Log;
  24. import org.apache.commons.logging.LogFactory;
  25. import org.apache.hadoop.fs.FileSystem;
  26. import org.apache.hadoop.fs.Path;
  27. import org.apache.hadoop.io.BytesWritable;
  28. import org.apache.hadoop.io.DataOutputBuffer;
  29. import org.apache.hadoop.io.serializer.SerializationFactory;
  30. import org.apache.hadoop.io.serializer.Serializer;
  31. import org.apache.hadoop.mapred.JobTrackerMetricsInst;
  32. import org.apache.hadoop.mapred.JvmTask;
  33. import org.apache.hadoop.mapred.JobClient.RawSplit;
  34. import org.apache.hadoop.util.ReflectionUtils;
  35. /** Implements MapReduce locally, in-process, for debugging. */ 
  36. class LocalJobRunner implements JobSubmissionProtocol {
  37.   public static final Log LOG =
  38.     LogFactory.getLog(LocalJobRunner.class);
  39.   private FileSystem fs;
  40.   private HashMap<JobID, Job> jobs = new HashMap<JobID, Job>();
  41.   private JobConf conf;
  42.   private int map_tasks = 0;
  43.   private int reduce_tasks = 0;
  44.   private JobTrackerInstrumentation myMetrics = null;
  45.   private static final String jobDir =  "localRunner/";
  46.   
  47.   public long getProtocolVersion(String protocol, long clientVersion) {
  48.     return JobSubmissionProtocol.versionID;
  49.   }
  50.   
  51.   private class Job extends Thread
  52.     implements TaskUmbilicalProtocol {
  53.     private Path file;
  54.     private JobID id;
  55.     private JobConf job;
  56.     private JobStatus status;
  57.     private ArrayList<TaskAttemptID> mapIds = new ArrayList<TaskAttemptID>();
  58.     private MapOutputFile mapoutputFile;
  59.     private JobProfile profile;
  60.     private Path localFile;
  61.     private FileSystem localFs;
  62.     boolean killed = false;
  63.     
  64.     // Counters summed over all the map/reduce tasks which
  65.     // have successfully completed
  66.     private Counters completedTaskCounters = new Counters();
  67.     
  68.     // Current counters, including incomplete task(s)
  69.     private Counters currentCounters = new Counters();
  70.     public long getProtocolVersion(String protocol, long clientVersion) {
  71.       return TaskUmbilicalProtocol.versionID;
  72.     }
  73.     
  74.     public Job(JobID jobid, JobConf conf) throws IOException {
  75.       this.file = new Path(getSystemDir(), jobid + "/job.xml");
  76.       this.id = jobid;
  77.       this.mapoutputFile = new MapOutputFile(jobid);
  78.       this.mapoutputFile.setConf(conf);
  79.       this.localFile = new JobConf(conf).getLocalPath(jobDir+id+".xml");
  80.       this.localFs = FileSystem.getLocal(conf);
  81.       fs.copyToLocalFile(file, localFile);
  82.       this.job = new JobConf(localFile);
  83.       profile = new JobProfile(job.getUser(), id, file.toString(), 
  84.                                "http://localhost:8080/", job.getJobName());
  85.       status = new JobStatus(id, 0.0f, 0.0f, JobStatus.RUNNING);
  86.       jobs.put(id, this);
  87.       this.start();
  88.     }
  89.     JobProfile getProfile() {
  90.       return profile;
  91.     }
  92.     
  93.     @Override
  94.     public void run() {
  95.       JobID jobId = profile.getJobID();
  96.       JobContext jContext = new JobContext(conf, jobId);
  97.       OutputCommitter outputCommitter = job.getOutputCommitter();
  98.       try {
  99.         // split input into minimum number of splits
  100.         RawSplit[] rawSplits;
  101.         if (job.getUseNewMapper()) {
  102.           org.apache.hadoop.mapreduce.InputFormat<?,?> input =
  103.               ReflectionUtils.newInstance(jContext.getInputFormatClass(), jContext.getJobConf());
  104.                     
  105.           List<org.apache.hadoop.mapreduce.InputSplit> splits = input.getSplits(jContext);
  106.           rawSplits = new RawSplit[splits.size()];
  107.           DataOutputBuffer buffer = new DataOutputBuffer();
  108.           SerializationFactory factory = new SerializationFactory(conf);
  109.           Serializer serializer = 
  110.             factory.getSerializer(splits.get(0).getClass());
  111.           serializer.open(buffer);
  112.           for (int i = 0; i < splits.size(); i++) {
  113.             buffer.reset();
  114.             serializer.serialize(splits.get(i));
  115.             RawSplit rawSplit = new RawSplit();
  116.             rawSplit.setClassName(splits.get(i).getClass().getName());
  117.             rawSplit.setDataLength(splits.get(i).getLength());
  118.             rawSplit.setBytes(buffer.getData(), 0, buffer.getLength());
  119.             rawSplit.setLocations(splits.get(i).getLocations());
  120.             rawSplits[i] = rawSplit;
  121.           }
  122.         } else {
  123.           InputSplit[] splits = job.getInputFormat().getSplits(job, 1);
  124.           rawSplits = new RawSplit[splits.length];
  125.           DataOutputBuffer buffer = new DataOutputBuffer();
  126.           for (int i = 0; i < splits.length; i++) {
  127.             buffer.reset();
  128.             splits[i].write(buffer);
  129.             RawSplit rawSplit = new RawSplit();
  130.             rawSplit.setClassName(splits[i].getClass().getName());
  131.             rawSplit.setDataLength(splits[i].getLength());
  132.             rawSplit.setBytes(buffer.getData(), 0, buffer.getLength());
  133.             rawSplit.setLocations(splits[i].getLocations());
  134.             rawSplits[i] = rawSplit;
  135.           }
  136.         }
  137.         
  138.         int numReduceTasks = job.getNumReduceTasks();
  139.         if (numReduceTasks > 1 || numReduceTasks < 0) {
  140.           // we only allow 0 or 1 reducer in local mode
  141.           numReduceTasks = 1;
  142.           job.setNumReduceTasks(1);
  143.         }
  144.         outputCommitter.setupJob(jContext);
  145.         status.setSetupProgress(1.0f);
  146.         
  147.         for (int i = 0; i < rawSplits.length; i++) {
  148.           if (!this.isInterrupted()) {
  149.             TaskAttemptID mapId = new TaskAttemptID(new TaskID(jobId, true, i),0);  
  150.             mapIds.add(mapId);
  151.             MapTask map = new MapTask(file.toString(),  
  152.                                       mapId, i,
  153.                                       rawSplits[i].getClassName(),
  154.                                       rawSplits[i].getBytes());
  155.             JobConf localConf = new JobConf(job);
  156.             map.setJobFile(localFile.toString());
  157.             map.localizeConfiguration(localConf);
  158.             map.setConf(localConf);
  159.             map_tasks += 1;
  160.             myMetrics.launchMap(mapId);
  161.             map.run(localConf, this);
  162.             myMetrics.completeMap(mapId);
  163.             map_tasks -= 1;
  164.             updateCounters(map);
  165.           } else {
  166.             throw new InterruptedException();
  167.           }
  168.         }
  169.         TaskAttemptID reduceId = 
  170.           new TaskAttemptID(new TaskID(jobId, false, 0), 0);
  171.         try {
  172.           if (numReduceTasks > 0) {
  173.             // move map output to reduce input  
  174.             for (int i = 0; i < mapIds.size(); i++) {
  175.               if (!this.isInterrupted()) {
  176.                 TaskAttemptID mapId = mapIds.get(i);
  177.                 Path mapOut = this.mapoutputFile.getOutputFile(mapId);
  178.                 Path reduceIn = this.mapoutputFile.getInputFileForWrite(
  179.                                   mapId.getTaskID(),reduceId,
  180.                                   localFs.getLength(mapOut));
  181.                 if (!localFs.mkdirs(reduceIn.getParent())) {
  182.                   throw new IOException("Mkdirs failed to create "
  183.                       + reduceIn.getParent().toString());
  184.                 }
  185.                 if (!localFs.rename(mapOut, reduceIn))
  186.                   throw new IOException("Couldn't rename " + mapOut);
  187.               } else {
  188.                 throw new InterruptedException();
  189.               }
  190.             }
  191.             if (!this.isInterrupted()) {
  192.               ReduceTask reduce = new ReduceTask(file.toString(), 
  193.                                                  reduceId, 0, mapIds.size());
  194.               JobConf localConf = new JobConf(job);
  195.               reduce.setJobFile(localFile.toString());
  196.               reduce.localizeConfiguration(localConf);
  197.               reduce.setConf(localConf);
  198.               reduce_tasks += 1;
  199.               myMetrics.launchReduce(reduce.getTaskID());
  200.               reduce.run(localConf, this);
  201.               myMetrics.completeReduce(reduce.getTaskID());
  202.               reduce_tasks -= 1;
  203.               updateCounters(reduce);
  204.             } else {
  205.               throw new InterruptedException();
  206.             }
  207.           }
  208.         } finally {
  209.           for (TaskAttemptID mapId: mapIds) {
  210.             this.mapoutputFile.removeAll(mapId);
  211.           }
  212.           if (numReduceTasks == 1) {
  213.             this.mapoutputFile.removeAll(reduceId);
  214.           }
  215.         }
  216.         // delete the temporary directory in output directory
  217.         outputCommitter.cleanupJob(jContext);
  218.         status.setCleanupProgress(1.0f);
  219.         if (killed) {
  220.           this.status.setRunState(JobStatus.KILLED);
  221.         } else {
  222.           this.status.setRunState(JobStatus.SUCCEEDED);
  223.         }
  224.         JobEndNotifier.localRunnerNotification(job, status);
  225.       } catch (Throwable t) {
  226.         try {
  227.           outputCommitter.cleanupJob(jContext);
  228.         } catch (IOException ioe) {
  229.           LOG.info("Error cleaning up job:" + id);
  230.         }
  231.         status.setCleanupProgress(1.0f);
  232.         if (killed) {
  233.           this.status.setRunState(JobStatus.KILLED);
  234.         } else {
  235.           this.status.setRunState(JobStatus.FAILED);
  236.         }
  237.         LOG.warn(id, t);
  238.         JobEndNotifier.localRunnerNotification(job, status);
  239.       } finally {
  240.         try {
  241.           fs.delete(file.getParent(), true);  // delete submit dir
  242.           localFs.delete(localFile, true);              // delete local copy
  243.         } catch (IOException e) {
  244.           LOG.warn("Error cleaning up "+id+": "+e);
  245.         }
  246.       }
  247.     }
  248.     // TaskUmbilicalProtocol methods
  249.     public JvmTask getTask(JVMId jvmId) { return null; }
  250.     public boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus) 
  251.     throws IOException, InterruptedException {
  252.       LOG.info(taskStatus.getStateString());
  253.       float taskIndex = mapIds.indexOf(taskId);
  254.       if (taskIndex >= 0) {                       // mapping
  255.         float numTasks = mapIds.size();
  256.         status.setMapProgress(taskIndex/numTasks + taskStatus.getProgress()/numTasks);
  257.       } else {
  258.         status.setReduceProgress(taskStatus.getProgress());
  259.       }
  260.       currentCounters = Counters.sum(completedTaskCounters, taskStatus.getCounters());
  261.       
  262.       // ignore phase
  263.       
  264.       return true;
  265.     }
  266.     /**
  267.      * Task is reporting that it is in commit_pending
  268.      * and it is waiting for the commit Response
  269.      */
  270.     public void commitPending(TaskAttemptID taskid,
  271.                               TaskStatus taskStatus) 
  272.     throws IOException, InterruptedException {
  273.       statusUpdate(taskid, taskStatus);
  274.     }
  275.     /**
  276.      * Updates counters corresponding to completed tasks.
  277.      * @param task A map or reduce task which has just been 
  278.      * successfully completed
  279.      */ 
  280.     private void updateCounters(Task task) {
  281.       completedTaskCounters.incrAllCounters(task.getCounters());
  282.     }
  283.     public void reportDiagnosticInfo(TaskAttemptID taskid, String trace) {
  284.       // Ignore for now
  285.     }
  286.     
  287.     public void reportNextRecordRange(TaskAttemptID taskid, 
  288.         SortedRanges.Range range) throws IOException {
  289.       LOG.info("Task " + taskid + " reportedNextRecordRange " + range);
  290.     }
  291.     public boolean ping(TaskAttemptID taskid) throws IOException {
  292.       return true;
  293.     }
  294.     
  295.     public boolean canCommit(TaskAttemptID taskid) 
  296.     throws IOException {
  297.       return true;
  298.     }
  299.     
  300.     public void done(TaskAttemptID taskId) throws IOException {
  301.       int taskIndex = mapIds.indexOf(taskId);
  302.       if (taskIndex >= 0) {                       // mapping
  303.         status.setMapProgress(1.0f);
  304.       } else {
  305.         status.setReduceProgress(1.0f);
  306.       }
  307.     }
  308.     public synchronized void fsError(TaskAttemptID taskId, String message) 
  309.     throws IOException {
  310.       LOG.fatal("FSError: "+ message + "from task: " + taskId);
  311.     }
  312.     public void shuffleError(TaskAttemptID taskId, String message) throws IOException {
  313.       LOG.fatal("shuffleError: "+ message + "from task: " + taskId);
  314.     }
  315.     
  316.     public MapTaskCompletionEventsUpdate getMapCompletionEvents(JobID jobId, 
  317.         int fromEventId, int maxLocs, TaskAttemptID id) throws IOException {
  318.       return new MapTaskCompletionEventsUpdate(TaskCompletionEvent.EMPTY_ARRAY,
  319.                                                false);
  320.     }
  321.     
  322.   }
  323.   public LocalJobRunner(JobConf conf) throws IOException {
  324.     this.fs = FileSystem.get(conf);
  325.     this.conf = conf;
  326.     myMetrics = new JobTrackerMetricsInst(null, new JobConf(conf));
  327.   }
  328.   // JobSubmissionProtocol methods
  329.   private static int jobid = 0;
  330.   public synchronized JobID getNewJobId() {
  331.     return new JobID("local", ++jobid);
  332.   }
  333.   public JobStatus submitJob(JobID jobid) throws IOException {
  334.     return new Job(jobid, this.conf).status;
  335.   }
  336.   public void killJob(JobID id) {
  337.     jobs.get(id).killed = true;
  338.     jobs.get(id).interrupt();
  339.   }
  340.   public void setJobPriority(JobID id, String jp) throws IOException {
  341.     throw new UnsupportedOperationException("Changing job priority " +
  342.                       "in LocalJobRunner is not supported.");
  343.   }
  344.   
  345.   /** Throws {@link UnsupportedOperationException} */
  346.   public boolean killTask(TaskAttemptID taskId, boolean shouldFail) throws IOException {
  347.     throw new UnsupportedOperationException("Killing tasks in " +
  348.     "LocalJobRunner is not supported");
  349.   }
  350.   public JobProfile getJobProfile(JobID id) {
  351.     Job job = jobs.get(id);
  352.     if(job != null)
  353.       return job.getProfile();
  354.     else 
  355.       return null;
  356.   }
  357.   public TaskReport[] getMapTaskReports(JobID id) {
  358.     return new TaskReport[0];
  359.   }
  360.   public TaskReport[] getReduceTaskReports(JobID id) {
  361.     return new TaskReport[0];
  362.   }
  363.   public TaskReport[] getCleanupTaskReports(JobID id) {
  364.     return new TaskReport[0];
  365.   }
  366.   public TaskReport[] getSetupTaskReports(JobID id) {
  367.     return new TaskReport[0];
  368.   }
  369.   public JobStatus getJobStatus(JobID id) {
  370.     Job job = jobs.get(id);
  371.     if(job != null)
  372.       return job.status;
  373.     else 
  374.       return null;
  375.   }
  376.   
  377.   public Counters getJobCounters(JobID id) {
  378.     Job job = jobs.get(id);
  379.     return job.currentCounters;
  380.   }
  381.   public String getFilesystemName() throws IOException {
  382.     return fs.getUri().toString();
  383.   }
  384.   
  385.   public ClusterStatus getClusterStatus(boolean detailed) {
  386.     return new ClusterStatus(1, 0, 0, map_tasks, reduce_tasks, 1, 1, 
  387.                              JobTracker.State.RUNNING);
  388.   }
  389.   public JobStatus[] jobsToComplete() {return null;}
  390.   public TaskCompletionEvent[] getTaskCompletionEvents(JobID jobid
  391.       , int fromEventId, int maxEvents) throws IOException {
  392.     return TaskCompletionEvent.EMPTY_ARRAY;
  393.   }
  394.   
  395.   public JobStatus[] getAllJobs() {return null;}
  396.   
  397.   /**
  398.    * Returns the diagnostic information for a particular task in the given job.
  399.    * To be implemented
  400.    */
  401.   public String[] getTaskDiagnostics(TaskAttemptID taskid)
  402.    throws IOException{
  403.   return new String [0];
  404.   }
  405.   /**
  406.    * @see org.apache.hadoop.mapred.JobSubmissionProtocol#getSystemDir()
  407.    */
  408.   public String getSystemDir() {
  409.     Path sysDir = new Path(conf.get("mapred.system.dir", "/tmp/hadoop/mapred/system"));  
  410.     return fs.makeQualified(sysDir).toString();
  411.   }
  412.   @Override
  413.   public JobStatus[] getJobsFromQueue(String queue) throws IOException {
  414.     return null;
  415.   }
  416.   @Override
  417.   public JobQueueInfo[] getQueues() throws IOException {
  418.     return null;
  419.   }
  420.   @Override
  421.   public JobQueueInfo getQueueInfo(String queue) throws IOException {
  422.     return null;
  423.   }
  424. }