Application.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:7k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred.pipes;
  19. import java.io.File;
  20. import java.io.IOException;
  21. import java.net.ServerSocket;
  22. import java.net.Socket;
  23. import java.util.ArrayList;
  24. import java.util.HashMap;
  25. import java.util.List;
  26. import java.util.Map;
  27. import org.apache.commons.logging.Log;
  28. import org.apache.commons.logging.LogFactory;
  29. import org.apache.hadoop.filecache.DistributedCache;
  30. import org.apache.hadoop.fs.FileUtil;
  31. import org.apache.hadoop.io.FloatWritable;
  32. import org.apache.hadoop.io.NullWritable;
  33. import org.apache.hadoop.io.Writable;
  34. import org.apache.hadoop.io.WritableComparable;
  35. import org.apache.hadoop.mapred.JobConf;
  36. import org.apache.hadoop.mapred.OutputCollector;
  37. import org.apache.hadoop.mapred.RecordReader;
  38. import org.apache.hadoop.mapred.Reporter;
  39. import org.apache.hadoop.mapred.TaskAttemptID;
  40. import org.apache.hadoop.mapred.TaskLog;
  41. import org.apache.hadoop.util.ReflectionUtils;
  42. import org.apache.hadoop.util.StringUtils;
  43. /**
  44.  * This class is responsible for launching and communicating with the child 
  45.  * process.
  46.  */
  47. class Application<K1 extends WritableComparable, V1 extends Writable,
  48.                   K2 extends WritableComparable, V2 extends Writable> {
  49.   private static final Log LOG = LogFactory.getLog(Application.class.getName());
  50.   private ServerSocket serverSocket;
  51.   private Process process;
  52.   private Socket clientSocket;
  53.   private OutputHandler<K2, V2> handler;
  54.   private DownwardProtocol<K1, V1> downlink;
  55.   static final boolean WINDOWS
  56.   = System.getProperty("os.name").startsWith("Windows");
  57.   /**
  58.    * Start the child process to handle the task for us.
  59.    * @param conf the task's configuration
  60.    * @param recordReader the fake record reader to update progress with
  61.    * @param output the collector to send output to
  62.    * @param reporter the reporter for the task
  63.    * @param outputKeyClass the class of the output keys
  64.    * @param outputValueClass the class of the output values
  65.    * @throws IOException
  66.    * @throws InterruptedException
  67.    */
  68.   Application(JobConf conf, 
  69.               RecordReader<FloatWritable, NullWritable> recordReader, 
  70.               OutputCollector<K2,V2> output, Reporter reporter,
  71.               Class<? extends K2> outputKeyClass,
  72.               Class<? extends V2> outputValueClass
  73.               ) throws IOException, InterruptedException {
  74.     serverSocket = new ServerSocket(0);
  75.     Map<String, String> env = new HashMap<String,String>();
  76.     // add TMPDIR environment variable with the value of java.io.tmpdir
  77.     env.put("TMPDIR", System.getProperty("java.io.tmpdir"));
  78.     env.put("hadoop.pipes.command.port", 
  79.             Integer.toString(serverSocket.getLocalPort()));
  80.     List<String> cmd = new ArrayList<String>();
  81.     String interpretor = conf.get("hadoop.pipes.executable.interpretor");
  82.     if (interpretor != null) {
  83.       cmd.add(interpretor);
  84.     }
  85.     String executable = DistributedCache.getLocalCacheFiles(conf)[0].toString();
  86.     FileUtil.chmod(executable, "a+x");
  87.     cmd.add(executable);
  88.     // wrap the command in a stdout/stderr capture
  89.     TaskAttemptID taskid = TaskAttemptID.forName(conf.get("mapred.task.id"));
  90.     File stdout = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDOUT);
  91.     File stderr = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDERR);
  92.     long logLength = TaskLog.getTaskLogLength(conf);
  93.     cmd = TaskLog.captureOutAndError(cmd, stdout, stderr, logLength);
  94.     process = runClient(cmd, env);
  95.     clientSocket = serverSocket.accept();
  96.     handler = new OutputHandler<K2, V2>(output, reporter, recordReader);
  97.     K2 outputKey = (K2)
  98.       ReflectionUtils.newInstance(outputKeyClass, conf);
  99.     V2 outputValue = (V2) 
  100.       ReflectionUtils.newInstance(outputValueClass, conf);
  101.     downlink = new BinaryProtocol<K1, V1, K2, V2>(clientSocket, handler, 
  102.                                   outputKey, outputValue, conf);
  103.     downlink.start();
  104.     downlink.setJobConf(conf);
  105.   }
  106.   /**
  107.    * Get the downward protocol object that can send commands down to the
  108.    * application.
  109.    * @return the downlink proxy
  110.    */
  111.   DownwardProtocol<K1, V1> getDownlink() {
  112.     return downlink;
  113.   }
  114.   /**
  115.    * Wait for the application to finish
  116.    * @return did the application finish correctly?
  117.    * @throws Throwable
  118.    */
  119.   boolean waitForFinish() throws Throwable {
  120.     downlink.flush();
  121.     return handler.waitForFinish();
  122.   }
  123.   /**
  124.    * Abort the application and wait for it to finish.
  125.    * @param t the exception that signalled the problem
  126.    * @throws IOException A wrapper around the exception that was passed in
  127.    */
  128.   void abort(Throwable t) throws IOException {
  129.     LOG.info("Aborting because of " + StringUtils.stringifyException(t));
  130.     try {
  131.       downlink.abort();
  132.       downlink.flush();
  133.     } catch (IOException e) {
  134.       // IGNORE cleanup problems
  135.     }
  136.     try {
  137.       handler.waitForFinish();
  138.     } catch (Throwable ignored) {
  139.       process.destroy();
  140.     }
  141.     IOException wrapper = new IOException("pipe child exception");
  142.     wrapper.initCause(t);
  143.     throw wrapper;      
  144.   }
  145.   
  146.   /**
  147.    * Clean up the child procress and socket.
  148.    * @throws IOException
  149.    */
  150.   void cleanup() throws IOException {
  151.     serverSocket.close();
  152.     try {
  153.       downlink.close();
  154.     } catch (InterruptedException ie) {
  155.       Thread.currentThread().interrupt();
  156.     }      
  157.   }
  158.   /**
  159.    * Run a given command in a subprocess, including threads to copy its stdout
  160.    * and stderr to our stdout and stderr.
  161.    * @param command the command and its arguments
  162.    * @param env the environment to run the process in
  163.    * @return a handle on the process
  164.    * @throws IOException
  165.    */
  166.   static Process runClient(List<String> command, 
  167.                            Map<String, String> env) throws IOException {
  168.     ProcessBuilder builder = new ProcessBuilder(command);
  169.     if (env != null) {
  170.       builder.environment().putAll(env);
  171.     }
  172.     Process result = builder.start();
  173.     return result;
  174.   }
  175. }