TestTrackerBlacklistAcrossJobs.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:4k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.IOException;
  20. import org.apache.hadoop.conf.Configuration;
  21. import org.apache.hadoop.examples.SleepJob.SleepInputFormat;
  22. import org.apache.hadoop.fs.FileSystem;
  23. import org.apache.hadoop.fs.Path;
  24. import org.apache.hadoop.hdfs.MiniDFSCluster;
  25. import org.apache.hadoop.io.IntWritable;
  26. import org.apache.hadoop.io.NullWritable;
  27. import org.apache.hadoop.mapred.lib.NullOutputFormat;
  28. import junit.framework.TestCase;
  29. public class TestTrackerBlacklistAcrossJobs extends TestCase {
  30.   private static final String hosts[] = new String[] {
  31.     "host1.rack.com", "host2.rack.com", "host3.rack.com"
  32.   };
  33.   final Path inDir = new Path("/testing");
  34.   final Path outDir = new Path("/output");
  35.   public static class SleepJobFailOnHost extends MapReduceBase
  36.     implements Mapper<IntWritable, IntWritable, IntWritable, NullWritable> {
  37.     String hostname = "";
  38.     
  39.     public void configure(JobConf job) {
  40.       this.hostname = job.get("slave.host.name");
  41.     }
  42.     
  43.     public void map(IntWritable key, IntWritable value,
  44.                     OutputCollector<IntWritable, NullWritable> output,
  45.                     Reporter reporter)
  46.     throws IOException {
  47.       if (this.hostname.equals(hosts[0])) {
  48.         // fail here
  49.         throw new IOException("failing on host: " + hosts[0]);
  50.       }
  51.     }
  52.   }
  53.   
  54.   public void testBlacklistAcrossJobs() throws IOException {
  55.     MiniDFSCluster dfs = null;
  56.     MiniMRCluster mr = null;
  57.     FileSystem fileSys = null;
  58.     Configuration conf = new Configuration();
  59.     // setup dfs and input
  60.     dfs = new MiniDFSCluster(conf, 1, true, null, hosts);
  61.     fileSys = dfs.getFileSystem();
  62.     if (!fileSys.mkdirs(inDir)) {
  63.       throw new IOException("Mkdirs failed to create " + inDir.toString());
  64.     }
  65.     UtilsForTests.writeFile(dfs.getNameNode(), conf, 
  66.                                  new Path(inDir + "/file"), (short) 1);
  67.     // start mr cluster
  68.     JobConf jtConf = new JobConf();
  69.     jtConf.setInt("mapred.max.tracker.blacklists", 1);
  70.     mr = new MiniMRCluster(3, fileSys.getUri().toString(),
  71.                            1, null, hosts, jtConf);
  72.     // setup job configuration
  73.     JobConf mrConf = mr.createJobConf();
  74.     JobConf job = new JobConf(mrConf);
  75.     job.setInt("mapred.max.tracker.failures", 1);
  76.     job.setNumMapTasks(30);
  77.     job.setNumReduceTasks(0);
  78.     job.setMapperClass(SleepJobFailOnHost.class);
  79.     job.setMapOutputKeyClass(IntWritable.class);
  80.     job.setMapOutputValueClass(NullWritable.class);
  81.     job.setOutputFormat(NullOutputFormat.class);
  82.     job.setInputFormat(SleepInputFormat.class);
  83.     FileInputFormat.setInputPaths(job, inDir);
  84.     FileOutputFormat.setOutputPath(job, outDir);
  85.     
  86.     // run the job
  87.     JobClient jc = new JobClient(mrConf);
  88.     RunningJob running = JobClient.runJob(job);
  89.     assertEquals("Job failed", JobStatus.SUCCEEDED, running.getJobState());
  90.     assertEquals("Didn't blacklist the host", 1, 
  91.       jc.getClusterStatus().getBlacklistedTrackers());
  92.     assertEquals("Fault count should be 1", 1, mr.getFaultCount(hosts[0]));
  93.     // run the same job once again 
  94.     // there should be no change in blacklist count
  95.     running = JobClient.runJob(job);
  96.     assertEquals("Job failed", JobStatus.SUCCEEDED, running.getJobState());
  97.     assertEquals("Didn't blacklist the host", 1,
  98.       jc.getClusterStatus().getBlacklistedTrackers());
  99.     assertEquals("Fault count should be 1", 1, mr.getFaultCount(hosts[0]));
  100.   }
  101. }