JobEndNotifier.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:8k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.IOException;
  20. import java.util.concurrent.BlockingQueue;
  21. import java.util.concurrent.DelayQueue;
  22. import java.util.concurrent.Delayed;
  23. import java.util.concurrent.TimeUnit;
  24. import org.apache.commons.httpclient.HttpClient;
  25. import org.apache.commons.httpclient.HttpMethod;
  26. import org.apache.commons.httpclient.URI;
  27. import org.apache.commons.httpclient.methods.GetMethod;
  28. import org.apache.commons.logging.Log;
  29. import org.apache.commons.logging.LogFactory;
  30. public class JobEndNotifier {
  31.   private static final Log LOG =
  32.     LogFactory.getLog(JobEndNotifier.class.getName());
  33.   private static Thread thread;
  34.   private static volatile boolean running;
  35.   private static BlockingQueue<JobEndStatusInfo> queue =
  36.     new DelayQueue<JobEndStatusInfo>();
  37.   public static void startNotifier() {
  38.     running = true;
  39.     thread = new Thread(
  40.                         new Runnable() {
  41.                           public void run() {
  42.                             try {
  43.                               while (running) {
  44.                                 sendNotification(queue.take());
  45.                               }
  46.                             }
  47.                             catch (InterruptedException irex) {
  48.                               if (running) {
  49.                                 LOG.error("Thread has ended unexpectedly", irex);
  50.                               }
  51.                             }
  52.                           }
  53.                           private void sendNotification(JobEndStatusInfo notification) {
  54.                             try {
  55.                               int code = httpNotification(notification.getUri());
  56.                               if (code != 200) {
  57.                                 throw new IOException("Invalid response status code: " + code);
  58.                               }
  59.                             }
  60.                             catch (IOException ioex) {
  61.                               LOG.error("Notification failure [" + notification + "]", ioex);
  62.                               if (notification.configureForRetry()) {
  63.                                 try {
  64.                                   queue.put(notification);
  65.                                 }
  66.                                 catch (InterruptedException iex) {
  67.                                   LOG.error("Notification queuing error [" + notification + "]",
  68.                                             iex);
  69.                                 }
  70.                               }
  71.                             }
  72.                             catch (Exception ex) {
  73.                               LOG.error("Notification failure [" + notification + "]", ex);
  74.                             }
  75.                           }
  76.                         }
  77.                         );
  78.     thread.start();
  79.   }
  80.   public static void stopNotifier() {
  81.     running = false;
  82.     thread.interrupt();
  83.   }
  84.   private static JobEndStatusInfo createNotification(JobConf conf,
  85.                                                      JobStatus status) {
  86.     JobEndStatusInfo notification = null;
  87.     String uri = conf.getJobEndNotificationURI();
  88.     if (uri != null) {
  89.       // +1 to make logic for first notification identical to a retry
  90.       int retryAttempts = conf.getInt("job.end.retry.attempts", 0) + 1;
  91.       long retryInterval = conf.getInt("job.end.retry.interval", 30000);
  92.       if (uri.contains("$jobId")) {
  93.         uri = uri.replace("$jobId", status.getJobID().toString());
  94.       }
  95.       if (uri.contains("$jobStatus")) {
  96.         String statusStr =
  97.           (status.getRunState() == JobStatus.SUCCEEDED) ? "SUCCEEDED" : 
  98.             (status.getRunState() == JobStatus.FAILED) ? "FAILED" : "KILLED";
  99.         uri = uri.replace("$jobStatus", statusStr);
  100.       }
  101.       notification = new JobEndStatusInfo(uri, retryAttempts, retryInterval);
  102.     }
  103.     return notification;
  104.   }
  105.   public static void registerNotification(JobConf jobConf, JobStatus status) {
  106.     JobEndStatusInfo notification = createNotification(jobConf, status);
  107.     if (notification != null) {
  108.       try {
  109.         queue.put(notification);
  110.       }
  111.       catch (InterruptedException iex) {
  112.         LOG.error("Notification queuing failure [" + notification + "]", iex);
  113.       }
  114.     }
  115.   }
  116.   private static int httpNotification(String uri) throws IOException {
  117.     URI url = new URI(uri, false);
  118.     HttpClient m_client = new HttpClient();
  119.     HttpMethod method = new GetMethod(url.getEscapedURI());
  120.     method.setRequestHeader("Accept", "*/*");
  121.     return m_client.executeMethod(method);
  122.   }
  123.   // for use by the LocalJobRunner, without using a thread&queue,
  124.   // simple synchronous way
  125.   public static void localRunnerNotification(JobConf conf, JobStatus status) {
  126.     JobEndStatusInfo notification = createNotification(conf, status);
  127.     if (notification != null) {
  128.       while (notification.configureForRetry()) {
  129.         try {
  130.           int code = httpNotification(notification.getUri());
  131.           if (code != 200) {
  132.             throw new IOException("Invalid response status code: " + code);
  133.           }
  134.           else {
  135.             break;
  136.           }
  137.         }
  138.         catch (IOException ioex) {
  139.           LOG.error("Notification error [" + notification.getUri() + "]", ioex);
  140.         }
  141.         catch (Exception ex) {
  142.           LOG.error("Notification error [" + notification.getUri() + "]", ex);
  143.         }
  144.         try {
  145.           synchronized (Thread.currentThread()) {
  146.             Thread.currentThread().sleep(notification.getRetryInterval());
  147.           }
  148.         }
  149.         catch (InterruptedException iex) {
  150.           LOG.error("Notification retry error [" + notification + "]", iex);
  151.         }
  152.       }
  153.     }
  154.   }
  155.   private static class JobEndStatusInfo implements Delayed {
  156.     private String uri;
  157.     private int retryAttempts;
  158.     private long retryInterval;
  159.     private long delayTime;
  160.     JobEndStatusInfo(String uri, int retryAttempts, long retryInterval) {
  161.       this.uri = uri;
  162.       this.retryAttempts = retryAttempts;
  163.       this.retryInterval = retryInterval;
  164.       this.delayTime = System.currentTimeMillis();
  165.     }
  166.     public String getUri() {
  167.       return uri;
  168.     }
  169.     public int getRetryAttempts() {
  170.       return retryAttempts;
  171.     }
  172.     public long getRetryInterval() {
  173.       return retryInterval;
  174.     }
  175.     public long getDelayTime() {
  176.       return delayTime;
  177.     }
  178.     public boolean configureForRetry() {
  179.       boolean retry = false;
  180.       if (getRetryAttempts() > 0) {
  181.         retry = true;
  182.         delayTime = System.currentTimeMillis() + retryInterval;
  183.       }
  184.       retryAttempts--;
  185.       return retry;
  186.     }
  187.     public long getDelay(TimeUnit unit) {
  188.       long n = this.delayTime - System.currentTimeMillis();
  189.       return unit.convert(n, TimeUnit.MILLISECONDS);
  190.     }
  191.     public int compareTo(Delayed d) {
  192.       return (int)(delayTime - ((JobEndStatusInfo)d).delayTime);
  193.     }
  194.     @Override
  195.     public boolean equals(Object o) {
  196.       if (!(o instanceof JobEndStatusInfo)) {
  197.         return false;
  198.       }
  199.       if (delayTime == ((JobEndStatusInfo)o).delayTime) {
  200.         return true;
  201.       }
  202.       return false;
  203.     }
  204.     @Override
  205.     public int hashCode() {
  206.       return 37 * 17 + (int) (delayTime^(delayTime>>>32));
  207.     }
  208.       
  209.     @Override
  210.     public String toString() {
  211.       return "URL: " + uri + " remaining retries: " + retryAttempts +
  212.         " interval: " + retryInterval;
  213.     }
  214.   }
  215. }