TestJobTrackerRestartWithLostTracker.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:6k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import org.apache.hadoop.conf.Configuration;
  20. import org.apache.hadoop.fs.*;
  21. import org.apache.hadoop.hdfs.MiniDFSCluster;
  22. import org.apache.hadoop.mapred.TestJobTrackerRestart;
  23. import junit.framework.TestCase;
  24. import java.io.*;
  25. /** 
  26.  * This test checks if the jobtracker can detect and recover a tracker that was
  27.  * lost while the jobtracker was down.
  28.  */
  29. public class TestJobTrackerRestartWithLostTracker extends TestCase {
  30.   final Path testDir = new Path("/jt-restart-lost-tt-testing");
  31.   final Path inDir = new Path(testDir, "input");
  32.   final Path shareDir = new Path(testDir, "share");
  33.   final Path outputDir = new Path(testDir, "output");
  34.   
  35.   private JobConf configureJob(JobConf conf, int maps, int reduces,
  36.                                String mapSignal, String redSignal) 
  37.   throws IOException {
  38.     UtilsForTests.configureWaitingJobConf(conf, inDir, outputDir, 
  39.         maps, reduces, "test-jobtracker-restart-with-lost-tt", 
  40.         mapSignal, redSignal);
  41.     return conf;
  42.   }
  43.   
  44.   public void testRecoveryWithLostTracker(MiniDFSCluster dfs,
  45.                                           MiniMRCluster mr) 
  46.   throws IOException {
  47.     FileSystem fileSys = dfs.getFileSystem();
  48.     JobConf jobConf = mr.createJobConf();
  49.     int numMaps = 50;
  50.     int numReds = 1;
  51.     String mapSignalFile = UtilsForTests.getMapSignalFile(shareDir);
  52.     String redSignalFile = UtilsForTests.getReduceSignalFile(shareDir);
  53.     
  54.     // Configure the jobs
  55.     JobConf job = configureJob(jobConf, numMaps, numReds, 
  56.                                mapSignalFile, redSignalFile);
  57.       
  58.     fileSys.delete(shareDir, true);
  59.     
  60.     // Submit a master job   
  61.     JobClient jobClient = new JobClient(job);
  62.     RunningJob rJob = jobClient.submitJob(job);
  63.     JobID id = rJob.getID();
  64.     
  65.     // wait for the job to be inited
  66.     mr.initializeJob(id);
  67.     
  68.     // Make sure that the master job is 50% completed
  69.     while (UtilsForTests.getJobStatus(jobClient, id).mapProgress() 
  70.            < 0.5f) {
  71.       UtilsForTests.waitFor(100);
  72.     }
  73.     // Kill the jobtracker
  74.     mr.stopJobTracker();
  75.     // Signal the maps to complete
  76.     UtilsForTests.signalTasks(dfs, fileSys, true, mapSignalFile, redSignalFile);
  77.     
  78.     // Enable recovery on restart
  79.     mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover", 
  80.                                       true);
  81.     
  82.     // Kill the 2nd tasktracker
  83.     mr.stopTaskTracker(1);
  84.     
  85.     // Wait for a minute before submitting a job
  86.     UtilsForTests.waitFor(60 * 1000);
  87.     
  88.     // Restart the jobtracker
  89.     mr.startJobTracker();
  90.     // Check if the jobs are still running
  91.     
  92.     // Wait for the JT to be ready
  93.     UtilsForTests.waitForJobTracker(jobClient);
  94.     // Signal the reducers to complete
  95.     UtilsForTests.signalTasks(dfs, fileSys, false, mapSignalFile, 
  96.                               redSignalFile);
  97.     
  98.     UtilsForTests.waitTillDone(jobClient);
  99.     // Check if the tasks on the lost tracker got re-executed
  100.     assertEquals("Tracker killed while the jobtracker was down did not get lost "
  101.                  + "upon restart", 
  102.                  jobClient.getClusterStatus().getTaskTrackers(), 1);
  103.     // validate the history file
  104.     TestJobHistory.validateJobHistoryFileFormat(id, job, "SUCCESS", true);
  105.     TestJobHistory.validateJobHistoryFileContent(mr, rJob, job);
  106.   }
  107.   
  108.   public void testRestartWithLostTracker() throws IOException {
  109.     String namenode = null;
  110.     MiniDFSCluster dfs = null;
  111.     MiniMRCluster mr = null;
  112.     FileSystem fileSys = null;
  113.     try {
  114.       Configuration conf = new Configuration();
  115.       conf.setBoolean("dfs.replication.considerLoad", false);
  116.       dfs = new MiniDFSCluster(conf, 1, true, null, null);
  117.       dfs.waitActive();
  118.       fileSys = dfs.getFileSystem();
  119.       
  120.       // clean up
  121.       fileSys.delete(testDir, true);
  122.       
  123.       if (!fileSys.mkdirs(inDir)) {
  124.         throw new IOException("Mkdirs failed to create " + inDir.toString());
  125.       }
  126.       // Write the input file
  127.       UtilsForTests.writeFile(dfs.getNameNode(), conf, 
  128.                               new Path(inDir + "/file"), (short)1);
  129.       dfs.startDataNodes(conf, 1, true, null, null, null, null);
  130.       dfs.waitActive();
  131.       namenode = (dfs.getFileSystem()).getUri().getHost() + ":" 
  132.                  + (dfs.getFileSystem()).getUri().getPort();
  133.       // Make sure that jobhistory leads to a proper job restart
  134.       // So keep the blocksize and the buffer size small
  135.       JobConf jtConf = new JobConf();
  136.       jtConf.set("mapred.jobtracker.job.history.block.size", "1024");
  137.       jtConf.set("mapred.jobtracker.job.history.buffer.size", "1024");
  138.       jtConf.setInt("mapred.tasktracker.reduce.tasks.maximum", 1);
  139.       jtConf.setLong("mapred.tasktracker.expiry.interval", 25 * 1000);
  140.       jtConf.setInt("mapred.reduce.copy.backoff", 4);
  141.       
  142.       mr = new MiniMRCluster(2, namenode, 1, null, null, jtConf);
  143.       
  144.       // Test Lost tracker case
  145.       testRecoveryWithLostTracker(dfs, mr);
  146.     } finally {
  147.       if (mr != null) {
  148.         try {
  149.           mr.shutdown();
  150.         } catch (Exception e) {}
  151.       }
  152.       if (dfs != null) {
  153.         try {
  154.           dfs.shutdown();
  155.         } catch (Exception e) {}
  156.       }
  157.     }
  158.   }
  159.   public static void main(String[] args) throws IOException {
  160.     new TestJobTrackerRestartWithLostTracker().testRestartWithLostTracker();
  161.   }
  162. }