Child.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:7k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.ByteArrayOutputStream;
  20. import java.io.File;
  21. import java.io.IOException;
  22. import java.io.PrintStream;
  23. import java.net.InetSocketAddress;
  24. import org.apache.commons.logging.Log;
  25. import org.apache.commons.logging.LogFactory;
  26. import org.apache.hadoop.fs.FSError;
  27. import org.apache.hadoop.fs.FileSystem;
  28. import org.apache.hadoop.fs.FileUtil;
  29. import org.apache.hadoop.fs.Path;
  30. import org.apache.hadoop.ipc.RPC;
  31. import org.apache.hadoop.mapred.JvmTask;
  32. import org.apache.hadoop.metrics.MetricsContext;
  33. import org.apache.hadoop.metrics.MetricsUtil;
  34. import org.apache.hadoop.metrics.jvm.JvmMetrics;
  35. import org.apache.log4j.LogManager;
  36. import org.apache.hadoop.util.StringUtils;
  37. /** 
  38.  * The main() for child processes. 
  39.  */
  40. class Child {
  41.   public static final Log LOG =
  42.     LogFactory.getLog(TaskTracker.class);
  43.   static volatile TaskAttemptID taskid = null;
  44.   static volatile boolean isCleanup;
  45.   public static void main(String[] args) throws Throwable {
  46.     LOG.debug("Child starting");
  47.     JobConf defaultConf = new JobConf();
  48.     String host = args[0];
  49.     int port = Integer.parseInt(args[1]);
  50.     InetSocketAddress address = new InetSocketAddress(host, port);
  51.     final TaskAttemptID firstTaskid = TaskAttemptID.forName(args[2]);
  52.     final int SLEEP_LONGER_COUNT = 5;
  53.     int jvmIdInt = Integer.parseInt(args[3]);
  54.     JVMId jvmId = new JVMId(firstTaskid.getJobID(),firstTaskid.isMap(),jvmIdInt);
  55.     TaskUmbilicalProtocol umbilical =
  56.       (TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,
  57.           TaskUmbilicalProtocol.versionID,
  58.           address,
  59.           defaultConf);
  60.     int numTasksToExecute = -1; //-1 signifies "no limit"
  61.     int numTasksExecuted = 0;
  62.     Runtime.getRuntime().addShutdownHook(new Thread() {
  63.       public void run() {
  64.         try {
  65.           if (taskid != null) {
  66.             TaskLog.syncLogs(firstTaskid, taskid, isCleanup);
  67.           }
  68.         } catch (Throwable throwable) {
  69.         }
  70.       }
  71.     });
  72.     Thread t = new Thread() {
  73.       public void run() {
  74.         //every so often wake up and syncLogs so that we can track
  75.         //logs of the currently running task
  76.         while (true) {
  77.           try {
  78.             Thread.sleep(5000);
  79.             if (taskid != null) {
  80.               TaskLog.syncLogs(firstTaskid, taskid, isCleanup);
  81.             }
  82.           } catch (InterruptedException ie) {
  83.           } catch (IOException iee) {
  84.             LOG.error("Error in syncLogs: " + iee);
  85.             System.exit(-1);
  86.           }
  87.         }
  88.       }
  89.     };
  90.     t.setName("Thread for syncLogs");
  91.     t.setDaemon(true);
  92.     t.start();
  93.     //for the memory management, a PID file is written and the PID file
  94.     //is written once per JVM. We simply symlink the file on a per task
  95.     //basis later (see below). Long term, we should change the Memory
  96.     //manager to use JVMId instead of TaskAttemptId
  97.     Path srcPidPath = null;
  98.     Path dstPidPath = null;
  99.     int idleLoopCount = 0;
  100.     Task task = null;
  101.     try {
  102.       while (true) {
  103.         taskid = null;
  104.         JvmTask myTask = umbilical.getTask(jvmId);
  105.         if (myTask.shouldDie()) {
  106.           break;
  107.         } else {
  108.           if (myTask.getTask() == null) {
  109.             taskid = null;
  110.             if (++idleLoopCount >= SLEEP_LONGER_COUNT) {
  111.               //we sleep for a bigger interval when we don't receive
  112.               //tasks for a while
  113.               Thread.sleep(1500);
  114.             } else {
  115.               Thread.sleep(500);
  116.             }
  117.             continue;
  118.           }
  119.         }
  120.         idleLoopCount = 0;
  121.         task = myTask.getTask();
  122.         taskid = task.getTaskID();
  123.         isCleanup = task.isTaskCleanupTask();
  124.         // reset the statistics for the task
  125.         FileSystem.clearStatistics();
  126.         //create the index file so that the log files 
  127.         //are viewable immediately
  128.         TaskLog.syncLogs(firstTaskid, taskid, isCleanup);
  129.         JobConf job = new JobConf(task.getJobFile());
  130.         if (job.getBoolean("task.memory.mgmt.enabled", false)) {
  131.           if (srcPidPath == null) {
  132.             srcPidPath = new Path(task.getPidFile());
  133.           }
  134.           //since the JVM is running multiple tasks potentially, we need
  135.           //to do symlink stuff only for the subsequent tasks
  136.           if (!taskid.equals(firstTaskid)) {
  137.             dstPidPath = new Path(task.getPidFile());
  138.             FileUtil.symLink(srcPidPath.toUri().getPath(), 
  139.                 dstPidPath.toUri().getPath());
  140.           }
  141.         }
  142.         //setupWorkDir actually sets up the symlinks for the distributed
  143.         //cache. After a task exits we wipe the workdir clean, and hence
  144.         //the symlinks have to be rebuilt.
  145.         TaskRunner.setupWorkDir(job);
  146.         numTasksToExecute = job.getNumTasksToExecutePerJvm();
  147.         assert(numTasksToExecute != 0);
  148.         TaskLog.cleanup(job.getInt("mapred.userlog.retain.hours", 24));
  149.         task.setConf(job);
  150.         defaultConf.addResource(new Path(task.getJobFile()));
  151.         // Initiate Java VM metrics
  152.         JvmMetrics.init(task.getPhase().toString(), job.getSessionId());
  153.         // use job-specified working directory
  154.         FileSystem.get(job).setWorkingDirectory(job.getWorkingDirectory());
  155.         try {
  156.           task.run(job, umbilical);             // run the task
  157.         } finally {
  158.           TaskLog.syncLogs(firstTaskid, taskid, isCleanup);
  159.           if (!taskid.equals(firstTaskid) && 
  160.               job.getBoolean("task.memory.mgmt.enabled", false)) {
  161.             // delete the pid-file's symlink
  162.             new File(dstPidPath.toUri().getPath()).delete();
  163.           }
  164.         }
  165.         if (numTasksToExecute > 0 && ++numTasksExecuted == numTasksToExecute) {
  166.           break;
  167.         }
  168.       }
  169.     } catch (FSError e) {
  170.       LOG.fatal("FSError from child", e);
  171.       umbilical.fsError(taskid, e.getMessage());
  172.     } catch (Throwable throwable) {
  173.       LOG.warn("Error running child", throwable);
  174.       try {
  175.         if (task != null) {
  176.           // do cleanup for the task
  177.           task.taskCleanup(umbilical);
  178.         }
  179.       } catch (Throwable th) {
  180.         LOG.info("Error cleaning up" + th);
  181.       }
  182.       // Report back any failures, for diagnostic purposes
  183.       ByteArrayOutputStream baos = new ByteArrayOutputStream();
  184.       throwable.printStackTrace(new PrintStream(baos));
  185.       if (taskid != null) {
  186.         umbilical.reportDiagnosticInfo(taskid, baos.toString());
  187.       }
  188.     } finally {
  189.       RPC.stopProxy(umbilical);
  190.       MetricsContext metricsContext = MetricsUtil.getContext("mapred");
  191.       metricsContext.close();
  192.       // Shutting down log4j of the child-vm... 
  193.       // This assumes that on return from Task.run() 
  194.       // there is no more logging done.
  195.       LogManager.shutdown();
  196.     }
  197.   }
  198. }