IsolationRunner.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:8k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.DataInputStream;
  20. import java.io.File;
  21. import java.io.IOException;
  22. import java.net.URL;
  23. import java.net.URLClassLoader;
  24. import java.util.ArrayList;
  25. import java.util.List;
  26. import org.apache.commons.logging.Log;
  27. import org.apache.commons.logging.LogFactory;
  28. import org.apache.hadoop.fs.FileSystem;
  29. import org.apache.hadoop.fs.LocalDirAllocator;
  30. import org.apache.hadoop.fs.Path;
  31. import org.apache.hadoop.io.BytesWritable;
  32. import org.apache.hadoop.io.SequenceFile;
  33. import org.apache.hadoop.io.Text;
  34. import org.apache.hadoop.io.Writable;
  35. import org.apache.hadoop.io.WritableComparable;
  36. import org.apache.hadoop.mapred.JvmTask;
  37. public class IsolationRunner {
  38.   private static final Log LOG = 
  39.     LogFactory.getLog(IsolationRunner.class.getName());
  40.   private static class FakeUmbilical implements TaskUmbilicalProtocol {
  41.     public long getProtocolVersion(String protocol, long clientVersion) {
  42.       return TaskUmbilicalProtocol.versionID;
  43.     }
  44.     
  45.     public void done(TaskAttemptID taskid) throws IOException {
  46.       LOG.info("Task " + taskid + " reporting done.");
  47.     }
  48.     public void fsError(TaskAttemptID taskId, String message) throws IOException {
  49.       LOG.info("Task " + taskId + " reporting file system error: " + message);
  50.     }
  51.     public void shuffleError(TaskAttemptID taskId, String message) throws IOException {
  52.       LOG.info("Task " + taskId + " reporting shuffle error: " + message);
  53.     }
  54.     public JvmTask getTask(JVMId jvmId) throws IOException {
  55.       return null;
  56.     }
  57.     public boolean ping(TaskAttemptID taskid) throws IOException {
  58.       return true;
  59.     }
  60.     public void commitPending(TaskAttemptID taskId, TaskStatus taskStatus) 
  61.     throws IOException, InterruptedException {
  62.       statusUpdate(taskId, taskStatus);
  63.     }
  64.     
  65.     public boolean canCommit(TaskAttemptID taskid) throws IOException {
  66.       return true;
  67.     }
  68.     
  69.     public boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus) 
  70.     throws IOException, InterruptedException {
  71.       StringBuffer buf = new StringBuffer("Task ");
  72.       buf.append(taskId);
  73.       buf.append(" making progress to ");
  74.       buf.append(taskStatus.getProgress());
  75.       String state = taskStatus.getStateString();
  76.       if (state != null) {
  77.         buf.append(" and state of ");
  78.         buf.append(state);
  79.       }
  80.       LOG.info(buf.toString());
  81.       // ignore phase
  82.       // ignore counters
  83.       return true;
  84.     }
  85.     public void reportDiagnosticInfo(TaskAttemptID taskid, String trace) throws IOException {
  86.       LOG.info("Task " + taskid + " has problem " + trace);
  87.     }
  88.     
  89.     public MapTaskCompletionEventsUpdate getMapCompletionEvents(JobID jobId, 
  90.         int fromEventId, int maxLocs, TaskAttemptID id) throws IOException {
  91.       return new MapTaskCompletionEventsUpdate(TaskCompletionEvent.EMPTY_ARRAY, 
  92.                                                false);
  93.     }
  94.     public void reportNextRecordRange(TaskAttemptID taskid, 
  95.         SortedRanges.Range range) throws IOException {
  96.       LOG.info("Task " + taskid + " reportedNextRecordRange " + range);
  97.     }
  98.   }
  99.   
  100.   private static ClassLoader makeClassLoader(JobConf conf, 
  101.                                              File workDir) throws IOException {
  102.     List<URL> cp = new ArrayList<URL>();
  103.     String jar = conf.getJar();
  104.     if (jar != null) {                      // if jar exists, it into workDir
  105.       File[] libs = new File(workDir, "lib").listFiles();
  106.       if (libs != null) {
  107.         for (int i = 0; i < libs.length; i++) {
  108.           cp.add(new URL("file:" + libs[i].toString()));
  109.         }
  110.       }
  111.       cp.add(new URL("file:" + new File(workDir, "classes/").toString()));
  112.       cp.add(new URL("file:" + workDir.toString() + "/"));
  113.     }
  114.     return new URLClassLoader(cp.toArray(new URL[cp.size()]));
  115.   }
  116.   
  117.   /**
  118.    * Create empty sequence files for any of the map outputs that we don't have.
  119.    * @param fs the filesystem to create the files in
  120.    * @param dir the directory name to create the files in
  121.    * @param conf the jobconf
  122.    * @throws IOException if something goes wrong writing
  123.    */
  124.   private static void fillInMissingMapOutputs(FileSystem fs, 
  125.                                               TaskAttemptID taskId,
  126.                                               int numMaps,
  127.                                               JobConf conf) throws IOException {
  128.     Class<? extends WritableComparable> keyClass
  129.         = conf.getMapOutputKeyClass().asSubclass(WritableComparable.class);
  130.     Class<? extends Writable> valueClass
  131.         = conf.getMapOutputValueClass().asSubclass(Writable.class);
  132.     MapOutputFile namer = new MapOutputFile(taskId.getJobID());
  133.     namer.setConf(conf);
  134.     for(int i=0; i<numMaps; i++) {
  135.       Path f = namer.getInputFile(i, taskId);
  136.       if (!fs.exists(f)) {
  137.         LOG.info("Create missing input: " + f);
  138.         SequenceFile.Writer out =
  139.           SequenceFile.createWriter(fs, conf, f, keyClass, valueClass);
  140.         out.close();
  141.       }
  142.     }    
  143.   }
  144.   
  145.   /**
  146.    * Run a single task
  147.    * @param args the first argument is the task directory
  148.    */
  149.   public static void main(String[] args
  150.                           ) throws ClassNotFoundException, IOException, 
  151.                                    InterruptedException {
  152.     if (args.length != 1) {
  153.       System.out.println("Usage: IsolationRunner <path>/job.xml");
  154.       System.exit(1);
  155.     }
  156.     File jobFilename = new File(args[0]);
  157.     if (!jobFilename.exists() || !jobFilename.isFile()) {
  158.       System.out.println(jobFilename + " is not a valid job file.");
  159.       System.exit(1);
  160.     }
  161.     JobConf conf = new JobConf(new Path(jobFilename.toString()));
  162.     TaskAttemptID taskId = TaskAttemptID.forName(conf.get("mapred.task.id"));
  163.     boolean isMap = conf.getBoolean("mapred.task.is.map", true);
  164.     int partition = conf.getInt("mapred.task.partition", 0);
  165.     
  166.     // setup the local and user working directories
  167.     FileSystem local = FileSystem.getLocal(conf);
  168.     LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
  169.     File workDirName = new File(lDirAlloc.getLocalPathToRead(
  170.                                   TaskTracker.getLocalTaskDir(
  171.                                     taskId.getJobID().toString(), 
  172.                                     taskId.toString())
  173.                                   + Path.SEPARATOR + "work",
  174.                                   conf). toString());
  175.     local.setWorkingDirectory(new Path(workDirName.toString()));
  176.     FileSystem.get(conf).setWorkingDirectory(conf.getWorkingDirectory());
  177.     
  178.     // set up a classloader with the right classpath
  179.     ClassLoader classLoader = makeClassLoader(conf, workDirName);
  180.     Thread.currentThread().setContextClassLoader(classLoader);
  181.     conf.setClassLoader(classLoader);
  182.     
  183.     Task task;
  184.     if (isMap) {
  185.       Path localSplit = new Path(new Path(jobFilename.toString()).getParent(), 
  186.                                  "split.dta");
  187.       DataInputStream splitFile = FileSystem.getLocal(conf).open(localSplit);
  188.       String splitClass = Text.readString(splitFile);
  189.       BytesWritable split = new BytesWritable();
  190.       split.readFields(splitFile);
  191.       splitFile.close();
  192.       task = new MapTask(jobFilename.toString(), taskId, partition, splitClass, split);
  193.     } else {
  194.       int numMaps = conf.getNumMapTasks();
  195.       fillInMissingMapOutputs(local, taskId, numMaps, conf);
  196.       task = new ReduceTask(jobFilename.toString(), taskId, partition, numMaps);
  197.     }
  198.     task.setConf(conf);
  199.     task.run(conf, new FakeUmbilical());
  200.   }
  201. }