TestRackAwareTaskPlacement.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:7k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.IOException;
  20. import junit.framework.TestCase;
  21. import org.apache.hadoop.conf.Configuration;
  22. import org.apache.hadoop.fs.FileSystem;
  23. import org.apache.hadoop.fs.Path;
  24. import org.apache.hadoop.hdfs.MiniDFSCluster;
  25. import org.apache.hadoop.io.BytesWritable;
  26. import org.apache.hadoop.mapred.SortValidator.RecordStatsChecker.NonSplitableSequenceFileInputFormat;
  27. import org.apache.hadoop.mapred.lib.IdentityMapper;
  28. import org.apache.hadoop.mapred.lib.IdentityReducer;
  29. public class TestRackAwareTaskPlacement extends TestCase {
  30.   private static final String rack1[] = new String[] {
  31.     "/r1"
  32.   };
  33.   private static final String hosts1[] = new String[] {
  34.     "host1.rack1.com"
  35.   };
  36.   private static final String rack2[] = new String[] {
  37.     "/r2", "/r2"
  38.   };
  39.   private static final String hosts2[] = new String[] {
  40.     "host1.rack2.com", "host2.rack2.com"
  41.   };
  42.   private static final String hosts3[] = new String[] {
  43.     "host3.rack1.com"
  44.   };
  45.   private static final String hosts4[] = new String[] {
  46.     "host1.rack2.com"
  47.   };
  48.   final Path inDir = new Path("/racktesting");
  49.   final Path outputPath = new Path("/output");
  50.   
  51.   /**
  52.    * Launches a MR job and tests the job counters against the expected values.
  53.    * @param testName The name for the job
  54.    * @param mr The MR cluster
  55.    * @param fileSys The FileSystem
  56.    * @param in Input path
  57.    * @param out Output path
  58.    * @param numMaps Number of maps
  59.    * @param otherLocalMaps Expected value of other local maps
  60.    * @param datalocalMaps Expected value of data(node) local maps
  61.    * @param racklocalMaps Expected value of rack local maps
  62.    */
  63.   static void launchJobAndTestCounters(String jobName, MiniMRCluster mr, 
  64.                                        FileSystem fileSys, Path in, Path out,
  65.                                        int numMaps, int otherLocalMaps,
  66.                                        int dataLocalMaps, int rackLocalMaps) 
  67.   throws IOException {
  68.     JobConf jobConf = mr.createJobConf();
  69.     if (fileSys.exists(out)) {
  70.         fileSys.delete(out, true);
  71.     }
  72.     RunningJob job = launchJob(jobConf, in, out, numMaps, jobName);
  73.     Counters counters = job.getCounters();
  74.     assertEquals("Number of local maps", 
  75.             counters.getCounter(JobInProgress.Counter.OTHER_LOCAL_MAPS), otherLocalMaps);
  76.     assertEquals("Number of Data-local maps", 
  77.             counters.getCounter(JobInProgress.Counter.DATA_LOCAL_MAPS), 
  78.                                 dataLocalMaps);
  79.     assertEquals("Number of Rack-local maps", 
  80.             counters.getCounter(JobInProgress.Counter.RACK_LOCAL_MAPS), 
  81.                                 rackLocalMaps);
  82.     mr.waitUntilIdle();
  83.     mr.shutdown();
  84.   }
  85.   public void testTaskPlacement() throws IOException {
  86.     String namenode = null;
  87.     MiniDFSCluster dfs = null;
  88.     MiniMRCluster mr = null;
  89.     FileSystem fileSys = null;
  90.     String testName = "TestForRackAwareness";
  91.     try {
  92.       final int taskTrackers = 1;
  93.       /* Start 3 datanodes, one in rack r1, and two in r2. Create three
  94.        * files (splits).
  95.        * 1) file1, just after starting the datanode on r1, with 
  96.        *    a repl factor of 1, and,
  97.        * 2) file2 & file3 after starting the other two datanodes, with a repl 
  98.        *    factor of 3.
  99.        * At the end, file1 will be present on only datanode1, and, file2 and 
  100.        * file3, will be present on all datanodes. 
  101.        */
  102.       Configuration conf = new Configuration();
  103.       conf.setBoolean("dfs.replication.considerLoad", false);
  104.       dfs = new MiniDFSCluster(conf, 1, true, rack1, hosts1);
  105.       dfs.waitActive();
  106.       fileSys = dfs.getFileSystem();
  107.       if (!fileSys.mkdirs(inDir)) {
  108.         throw new IOException("Mkdirs failed to create " + inDir.toString());
  109.       }
  110.       UtilsForTests.writeFile(dfs.getNameNode(), conf, new Path(inDir + "/file1"), (short)1);
  111.       dfs.startDataNodes(conf, 2, true, null, rack2, hosts2, null);
  112.       dfs.waitActive();
  113.       UtilsForTests.writeFile(dfs.getNameNode(), conf, new Path(inDir + "/file2"), (short)3);
  114.       UtilsForTests.writeFile(dfs.getNameNode(), conf, new Path(inDir + "/file3"), (short)3);
  115.       
  116.       namenode = (dfs.getFileSystem()).getUri().getHost() + ":" + 
  117.                  (dfs.getFileSystem()).getUri().getPort(); 
  118.       /* Run a job with the (only)tasktracker on rack2. The rack location
  119.        * of the tasktracker will determine how many data/rack local maps it
  120.        * runs. The hostname of the tasktracker is set to same as one of the 
  121.        * datanodes.
  122.        */
  123.       mr = new MiniMRCluster(taskTrackers, namenode, 1, rack2, hosts4);
  124.       /* The job is configured with three maps since there are three 
  125.        * (non-splittable) files. On rack2, there are two files and both
  126.        * have repl of three. The blocks for those files must therefore be
  127.        * present on all the datanodes, in particular, the datanodes on rack2.
  128.        * The third input file is pulled from rack1.
  129.        */
  130.       launchJobAndTestCounters(testName, mr, fileSys, inDir, outputPath, 3, 0,
  131.                                2, 0);
  132.       mr.shutdown();
  133.       
  134.       /* Run a job with the (only)tasktracker on rack1.
  135.        */
  136.       mr = new MiniMRCluster(taskTrackers, namenode, 1, rack1, hosts3);
  137.       /* The job is configured with three maps since there are three 
  138.        * (non-splittable) files. On rack1, because of the way in which repl
  139.        * was setup while creating the files, we will have all the three files. 
  140.        * Thus, a tasktracker will find all inputs in this rack.
  141.        */
  142.       launchJobAndTestCounters(testName, mr, fileSys, inDir, outputPath, 3, 0,
  143.                                0, 3);
  144.       mr.shutdown();
  145.       
  146.     } finally {
  147.       if (dfs != null) { 
  148.         dfs.shutdown(); 
  149.       }
  150.       if (mr != null) { 
  151.         mr.shutdown();
  152.       }
  153.     }
  154.   }
  155.   static RunningJob launchJob(JobConf jobConf, Path inDir, Path outputPath, 
  156.                               int numMaps, String jobName) throws IOException {
  157.     jobConf.setJobName(jobName);
  158.     jobConf.setInputFormat(NonSplitableSequenceFileInputFormat.class);
  159.     jobConf.setOutputFormat(SequenceFileOutputFormat.class);
  160.     FileInputFormat.setInputPaths(jobConf, inDir);
  161.     FileOutputFormat.setOutputPath(jobConf, outputPath);
  162.     jobConf.setMapperClass(IdentityMapper.class);
  163.     jobConf.setReducerClass(IdentityReducer.class);
  164.     jobConf.setOutputKeyClass(BytesWritable.class);
  165.     jobConf.setOutputValueClass(BytesWritable.class);
  166.     jobConf.setNumMapTasks(numMaps);
  167.     jobConf.setNumReduceTasks(0);
  168.     jobConf.setJar("build/test/testjar/testjob.jar");
  169.     return JobClient.runJob(jobConf);
  170.   }
  171. }