HadoopServer.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:13k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.eclipse.server;
  19. import java.io.File;
  20. import java.io.FileOutputStream;
  21. import java.io.IOException;
  22. import java.util.Collection;
  23. import java.util.Collections;
  24. import java.util.HashSet;
  25. import java.util.Map;
  26. import java.util.Set;
  27. import java.util.TreeMap;
  28. import java.util.logging.Logger;
  29. import javax.xml.parsers.DocumentBuilder;
  30. import javax.xml.parsers.DocumentBuilderFactory;
  31. import javax.xml.parsers.ParserConfigurationException;
  32. import org.apache.hadoop.conf.Configuration;
  33. import org.apache.hadoop.eclipse.Activator;
  34. import org.apache.hadoop.fs.FileSystem;
  35. import org.apache.hadoop.mapred.JobClient;
  36. import org.apache.hadoop.mapred.JobConf;
  37. import org.apache.hadoop.mapred.JobID;
  38. import org.apache.hadoop.mapred.JobStatus;
  39. import org.apache.hadoop.mapred.RunningJob;
  40. import org.eclipse.core.runtime.IProgressMonitor;
  41. import org.eclipse.core.runtime.IStatus;
  42. import org.eclipse.core.runtime.Status;
  43. import org.eclipse.core.runtime.jobs.Job;
  44. import org.eclipse.swt.widgets.Display;
  45. import org.w3c.dom.Document;
  46. import org.w3c.dom.Element;
  47. import org.w3c.dom.Node;
  48. import org.w3c.dom.NodeList;
  49. import org.w3c.dom.Text;
  50. import org.xml.sax.SAXException;
  51. /**
  52.  * Representation of a Hadoop location, meaning of the master node (NameNode,
  53.  * JobTracker).
  54.  * 
  55.  * <p>
  56.  * This class does not create any SSH connection anymore. Tunneling must be
  57.  * setup outside of Eclipse for now (using Putty or <tt>ssh -D&lt;port&gt;
  58.  * &lt;host&gt;</tt>)
  59.  * 
  60.  * <p>
  61.  * <em> TODO </em>
  62.  * <li> Disable the updater if a location becomes unreachable or fails for
  63.  * tool long
  64.  * <li> Stop the updater on location's disposal/removal
  65.  */
  66. public class HadoopServer {
  67.   /**
  68.    * Frequency of location status observations expressed as the delay in ms
  69.    * between each observation
  70.    * 
  71.    * TODO Add a preference parameter for this
  72.    */
  73.   protected static final long STATUS_OBSERVATION_DELAY = 1500;
  74.   /**
  75.    * 
  76.    */
  77.   public class LocationStatusUpdater extends Job {
  78.     JobClient client = null;
  79.     /**
  80.      * Setup the updater
  81.      */
  82.     public LocationStatusUpdater() {
  83.       super("Map/Reduce location status updater");
  84.       this.setSystem(true);
  85.     }
  86.     /* @inheritDoc */
  87.     @Override
  88.     protected IStatus run(IProgressMonitor monitor) {
  89.       if (client == null) {
  90.         try {
  91.           client = HadoopServer.this.getJobClient();
  92.         } catch (IOException ioe) {
  93.           client = null;
  94.           return new Status(Status.ERROR, Activator.PLUGIN_ID, 0,
  95.               "Cannot connect to the Map/Reduce location: "
  96.                             + HadoopServer.this.getLocationName(),
  97.                             ioe);
  98.         }
  99.       }
  100.       try {
  101.         // Set of all known existing Job IDs we want fresh info of
  102.         Set<JobID> missingJobIds =
  103.             new HashSet<JobID>(runningJobs.keySet());
  104.         JobStatus[] jstatus = client.jobsToComplete();
  105.         for (JobStatus status : jstatus) {
  106.           JobID jobId = status.getJobID();
  107.           missingJobIds.remove(jobId);
  108.           HadoopJob hJob;
  109.           synchronized (HadoopServer.this.runningJobs) {
  110.             hJob = runningJobs.get(jobId);
  111.             if (hJob == null) {
  112.               // Unknown job, create an entry
  113.               RunningJob running = client.getJob(jobId);
  114.               hJob =
  115.                   new HadoopJob(HadoopServer.this, jobId, running, status);
  116.               newJob(hJob);
  117.             }
  118.           }
  119.           // Update HadoopJob with fresh infos
  120.           updateJob(hJob, status);
  121.         }
  122.         // Ask explicitly for fresh info for these Job IDs
  123.         for (JobID jobId : missingJobIds) {
  124.           HadoopJob hJob = runningJobs.get(jobId);
  125.           if (!hJob.isCompleted())
  126.             updateJob(hJob, null);
  127.         }
  128.       } catch (IOException ioe) {
  129.         client = null;
  130.         return new Status(Status.ERROR, Activator.PLUGIN_ID, 0,
  131.             "Cannot retrieve running Jobs on location: "
  132.                           + HadoopServer.this.getLocationName(), ioe);
  133.       }
  134.       // Schedule the next observation
  135.       schedule(STATUS_OBSERVATION_DELAY);
  136.       return Status.OK_STATUS;
  137.     }
  138.     /**
  139.      * Stores and make the new job available
  140.      * 
  141.      * @param data
  142.      */
  143.     private void newJob(final HadoopJob data) {
  144.       runningJobs.put(data.getJobID(), data);
  145.       Display.getDefault().asyncExec(new Runnable() {
  146.         public void run() {
  147.           fireJobAdded(data);
  148.         }
  149.       });
  150.     }
  151.     /**
  152.      * Updates the status of a job
  153.      * 
  154.      * @param job the job to update
  155.      */
  156.     private void updateJob(final HadoopJob job, JobStatus status) {
  157.       job.update(status);
  158.       Display.getDefault().asyncExec(new Runnable() {
  159.         public void run() {
  160.           fireJobChanged(job);
  161.         }
  162.       });
  163.     }
  164.   }
  165.   static Logger log = Logger.getLogger(HadoopServer.class.getName());
  166.   /**
  167.    * Hadoop configuration of the location. Also contains specific parameters
  168.    * for the plug-in. These parameters are prefix with eclipse.plug-in.*
  169.    */
  170.   private Configuration conf;
  171.   /**
  172.    * Jobs listeners
  173.    */
  174.   private Set<IJobListener> jobListeners = new HashSet<IJobListener>();
  175.   /**
  176.    * Jobs running on this location. The keys of this map are the Job IDs.
  177.    */
  178.   private transient Map<JobID, HadoopJob> runningJobs =
  179.       Collections.synchronizedMap(new TreeMap<JobID, HadoopJob>());
  180.   /**
  181.    * Status updater for this location
  182.    */
  183.   private LocationStatusUpdater statusUpdater;
  184.   // state and status - transient
  185.   private transient String state = "";
  186.   /**
  187.    * Creates a new default Hadoop location
  188.    */
  189.   public HadoopServer() {
  190.     this.conf = new Configuration();
  191.     this.addPluginConfigDefaultProperties();
  192.   }
  193.   /**
  194.    * Creates a location from a file
  195.    * 
  196.    * @throws IOException
  197.    * @throws SAXException
  198.    * @throws ParserConfigurationException
  199.    */
  200.   public HadoopServer(File file) throws ParserConfigurationException,
  201.       SAXException, IOException {
  202.     this.conf = new Configuration();
  203.     this.addPluginConfigDefaultProperties();
  204.     this.loadFromXML(file);
  205.   }
  206.   /**
  207.    * Create a new Hadoop location by copying an already existing one.
  208.    * 
  209.    * @param source the location to copy
  210.    */
  211.   public HadoopServer(HadoopServer existing) {
  212.     this();
  213.     this.load(existing);
  214.   }
  215.   public void addJobListener(IJobListener l) {
  216.     jobListeners.add(l);
  217.   }
  218.   public void dispose() {
  219.     // TODO close DFS connections?
  220.   }
  221.   /**
  222.    * List all elements that should be present in the Server window (all
  223.    * servers and all jobs running on each servers)
  224.    * 
  225.    * @return collection of jobs for this location
  226.    */
  227.   public Collection<HadoopJob> getJobs() {
  228.     startStatusUpdater();
  229.     return this.runningJobs.values();
  230.   }
  231.   /**
  232.    * Remove the given job from the currently running jobs map
  233.    * 
  234.    * @param job the job to remove
  235.    */
  236.   public void purgeJob(final HadoopJob job) {
  237.     runningJobs.remove(job.getJobID());
  238.     Display.getDefault().asyncExec(new Runnable() {
  239.       public void run() {
  240.         fireJobRemoved(job);
  241.       }
  242.     });
  243.   }
  244.   /**
  245.    * Returns the {@link Configuration} defining this location.
  246.    * 
  247.    * @return the location configuration
  248.    */
  249.   public Configuration getConfiguration() {
  250.     return this.conf;
  251.   }
  252.   /**
  253.    * Gets a Hadoop configuration property value
  254.    * 
  255.    * @param prop the configuration property
  256.    * @return the property value
  257.    */
  258.   public String getConfProp(ConfProp prop) {
  259.     return prop.get(conf);
  260.   }
  261.   /**
  262.    * Gets a Hadoop configuration property value
  263.    * 
  264.    * @param propName the property name
  265.    * @return the property value
  266.    */
  267.   public String getConfProp(String propName) {
  268.     return this.conf.get(propName);
  269.   }
  270.   public String getLocationName() {
  271.     return ConfProp.PI_LOCATION_NAME.get(conf);
  272.   }
  273.   /**
  274.    * Returns the master host name of the Hadoop location (the Job tracker)
  275.    * 
  276.    * @return the host name of the Job tracker
  277.    */
  278.   public String getMasterHostName() {
  279.     return getConfProp(ConfProp.PI_JOB_TRACKER_HOST);
  280.   }
  281.   public String getState() {
  282.     return state;
  283.   }
  284.   /**
  285.    * Overwrite this location with the given existing location
  286.    * 
  287.    * @param existing the existing location
  288.    */
  289.   public void load(HadoopServer existing) {
  290.     this.conf = new Configuration(existing.conf);
  291.   }
  292.   /**
  293.    * Overwrite this location with settings available in the given XML file.
  294.    * The existing configuration is preserved if the XML file is invalid.
  295.    * 
  296.    * @param file the file path of the XML file
  297.    * @return validity of the XML file
  298.    * @throws ParserConfigurationException
  299.    * @throws IOException
  300.    * @throws SAXException
  301.    */
  302.   public boolean loadFromXML(File file) throws ParserConfigurationException,
  303.       SAXException, IOException {
  304.     Configuration newConf = new Configuration(this.conf);
  305.     DocumentBuilder builder =
  306.         DocumentBuilderFactory.newInstance().newDocumentBuilder();
  307.     Document document = builder.parse(file);
  308.     Element root = document.getDocumentElement();
  309.     if (!"configuration".equals(root.getTagName()))
  310.       return false;
  311.     NodeList props = root.getChildNodes();
  312.     for (int i = 0; i < props.getLength(); i++) {
  313.       Node propNode = props.item(i);
  314.       if (!(propNode instanceof Element))
  315.         continue;
  316.       Element prop = (Element) propNode;
  317.       if (!"property".equals(prop.getTagName()))
  318.         return false;
  319.       NodeList fields = prop.getChildNodes();
  320.       String attr = null;
  321.       String value = null;
  322.       for (int j = 0; j < fields.getLength(); j++) {
  323.         Node fieldNode = fields.item(j);
  324.         if (!(fieldNode instanceof Element))
  325.           continue;
  326.         Element field = (Element) fieldNode;
  327.         if ("name".equals(field.getTagName()))
  328.           attr = ((Text) field.getFirstChild()).getData();
  329.         if ("value".equals(field.getTagName()) && field.hasChildNodes())
  330.           value = ((Text) field.getFirstChild()).getData();
  331.       }
  332.       if (attr != null && value != null)
  333.         newConf.set(attr, value);
  334.     }
  335.     this.conf = newConf;
  336.     return true;
  337.   }
  338.   /**
  339.    * Sets a Hadoop configuration property value
  340.    * 
  341.    * @param prop the property
  342.    * @param propvalue the property value
  343.    */
  344.   public void setConfProp(ConfProp prop, String propValue) {
  345.     prop.set(conf, propValue);
  346.   }
  347.   /**
  348.    * Sets a Hadoop configuration property value
  349.    * 
  350.    * @param propName the property name
  351.    * @param propValue the property value
  352.    */
  353.   public void setConfProp(String propName, String propValue) {
  354.     this.conf.set(propName, propValue);
  355.   }
  356.   public void setLocationName(String newName) {
  357.     ConfProp.PI_LOCATION_NAME.set(conf, newName);
  358.   }
  359.   /**
  360.    * Write this location settings to the given output stream
  361.    * 
  362.    * @param out the output stream
  363.    * @throws IOException
  364.    */
  365.   public void storeSettingsToFile(File file) throws IOException {
  366.     FileOutputStream fos = new FileOutputStream(file);
  367.     this.conf.writeXml(fos);
  368.     fos.close();
  369.   }
  370.   /* @inheritDoc */
  371.   @Override
  372.   public String toString() {
  373.     return this.getLocationName();
  374.   }
  375.   /**
  376.    * Fill the configuration with valid default values
  377.    */
  378.   private void addPluginConfigDefaultProperties() {
  379.     for (ConfProp prop : ConfProp.values()) {
  380.       if (conf.get(prop.name) == null)
  381.         conf.set(prop.name, prop.defVal);
  382.     }
  383.   }
  384.   /**
  385.    * Starts the location status updater
  386.    */
  387.   private synchronized void startStatusUpdater() {
  388.     if (statusUpdater == null) {
  389.       statusUpdater = new LocationStatusUpdater();
  390.       statusUpdater.schedule();
  391.     }
  392.   }
  393.   /*
  394.    * Rewrite of the connecting and tunneling to the Hadoop location
  395.    */
  396.   /**
  397.    * Provides access to the default file system of this location.
  398.    * 
  399.    * @return a {@link FileSystem}
  400.    */
  401.   public FileSystem getDFS() throws IOException {
  402.     return FileSystem.get(this.conf);
  403.   }
  404.   /**
  405.    * Provides access to the Job tracking system of this location
  406.    * 
  407.    * @return a {@link JobClient}
  408.    */
  409.   public JobClient getJobClient() throws IOException {
  410.     JobConf jconf = new JobConf(this.conf);
  411.     return new JobClient(jconf);
  412.   }
  413.   /*
  414.    * Listeners handling
  415.    */
  416.   protected void fireJarPublishDone(JarModule jar) {
  417.     for (IJobListener listener : jobListeners) {
  418.       listener.publishDone(jar);
  419.     }
  420.   }
  421.   protected void fireJarPublishStart(JarModule jar) {
  422.     for (IJobListener listener : jobListeners) {
  423.       listener.publishStart(jar);
  424.     }
  425.   }
  426.   protected void fireJobAdded(HadoopJob job) {
  427.     for (IJobListener listener : jobListeners) {
  428.       listener.jobAdded(job);
  429.     }
  430.   }
  431.   protected void fireJobRemoved(HadoopJob job) {
  432.     for (IJobListener listener : jobListeners) {
  433.       listener.jobRemoved(job);
  434.     }
  435.   }
  436.   protected void fireJobChanged(HadoopJob job) {
  437.     for (IJobListener listener : jobListeners) {
  438.       listener.jobChanged(job);
  439.     }
  440.   }
  441. }