Executor.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:4k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.contrib.failmon;
  19. import java.util.ArrayList;
  20. import org.apache.hadoop.conf.Configuration;
  21. /**********************************************************
  22.  * This class executes monitoring jobs on all nodes of the
  23.  * cluster, on which we intend to gather failure metrics. 
  24.  * It is basically a thread that sleeps and periodically wakes
  25.  * up to execute monitoring jobs and ship all gathered data to 
  26.  * a "safe" location, which in most cases will be the HDFS 
  27.  * filesystem of the monitored cluster.
  28.  * 
  29.  **********************************************************/
  30. public class Executor implements Runnable {
  31.   public static final int DEFAULT_LOG_INTERVAL = 3600;
  32.   public static final int DEFAULT_POLL_INTERVAL = 360;
  33.   public static int MIN_INTERVAL = 5;
  34.   public static int instances = 0;
  35.   LocalStore lstore;
  36.   ArrayList<MonitorJob> monitors;
  37.   
  38.   int interval;
  39.   int upload_interval;
  40.   int upload_counter;
  41.   
  42.   /**
  43.    * Create an instance of the class and read the configuration
  44.    * file to determine the set of jobs that will be run and the 
  45.    * maximum interval for which the thread can sleep before it 
  46.    * wakes up to execute a monitoring job on the node.
  47.    * 
  48.    */ 
  49.   public Executor(Configuration conf) {
  50.     
  51.     Environment.prepare("conf/failmon.properties");
  52.     
  53.     String localTmpDir;
  54.     
  55.     if (conf == null) {
  56.       // running as a stand-alone application
  57.       localTmpDir = System.getProperty("java.io.tmpdir");
  58.       Environment.setProperty("local.tmp.dir", localTmpDir);
  59.     } else {
  60.       // running from within Hadoop
  61.       localTmpDir = conf.get("hadoop.tmp.dir");
  62.       String hadoopLogPath = System.getProperty("hadoop.log.dir") + "/" + System.getProperty("hadoop.log.file");
  63.       Environment.setProperty("hadoop.log.file", hadoopLogPath);
  64.       Environment.setProperty("local.tmp.dir", localTmpDir);
  65.     }
  66.     
  67.     monitors = Environment.getJobs();
  68.     interval = Environment.getInterval(monitors);
  69.     upload_interval = LocalStore.UPLOAD_INTERVAL;
  70.     lstore = new LocalStore();
  71.     
  72.     if (Environment.getProperty("local.upload.interval") != null) 
  73.      upload_interval = Integer.parseInt(Environment.getProperty("local.upload.interval"));
  74.     instances++;
  75.   }
  76.   public void run() {
  77.     upload_counter = upload_interval;
  78.     Environment.logInfo("Failmon Executor thread started successfully.");
  79.     while (true) {
  80.       try {
  81.         Thread.sleep(interval * 1000);
  82.         for (int i = 0; i < monitors.size(); i++) {
  83.           monitors.get(i).counter -= interval;
  84.           if (monitors.get(i).counter <= 0) {
  85.             monitors.get(i).reset();
  86.             Environment.logInfo("Calling " + monitors.get(i).job.getInfo() + "...t");
  87.             monitors.get(i).job.monitor(lstore);
  88.           }
  89.         }
  90.         upload_counter -= interval;
  91.         if (upload_counter <= 0) {
  92.           lstore.upload();
  93.           upload_counter = upload_interval;
  94.         }
  95.       } catch (InterruptedException e) {
  96.         e.printStackTrace();
  97.       }
  98.     }
  99.   }
  100.   public void cleanup() {
  101.     instances--;   
  102.   }
  103. }