TestMultipleLevelCaching.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:4k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.IOException;
  20. import junit.framework.TestCase;
  21. import org.apache.hadoop.conf.Configuration;
  22. import org.apache.hadoop.fs.FileSystem;
  23. import org.apache.hadoop.fs.Path;
  24. import org.apache.hadoop.hdfs.MiniDFSCluster;
  25. import org.apache.hadoop.mapred.TestRackAwareTaskPlacement;
  26. /**
  27.  * This test checks whether the task caches are created and used properly.
  28.  */
  29. public class TestMultipleLevelCaching extends TestCase {
  30.   private static final int MAX_LEVEL = 5;
  31.   final Path inDir = new Path("/cachetesting");
  32.   final Path outputPath = new Path("/output");
  33.   /**
  34.    * Returns a string representing a rack with level + 1 nodes in the topology
  35.    * for the rack.
  36.    * For id = 2, level = 2 we get /a/b2/c2
  37.    *     id = 1, level = 3 we get /a/b1/c1/d1
  38.    * NOTE There should always be one shared node i.e /a 
  39.    * @param id Unique Id for the rack
  40.    * @param level The level in the topology where the separation starts
  41.    */
  42.   private static String getRack(int id, int level) {
  43.     StringBuilder rack = new StringBuilder();
  44.     char alpha = 'a';
  45.     int length = level + 1;
  46.     while (length > level) {
  47.       rack.append("/");
  48.       rack.append(alpha);
  49.       ++alpha;
  50.       --length;
  51.     }
  52.     while (length > 0) {
  53.       rack.append("/");
  54.       rack.append(alpha);
  55.       rack.append(id);
  56.       ++alpha;
  57.       --length;
  58.     }
  59.     return rack.toString();
  60.   }
  61.   public void testMultiLevelCaching() throws IOException {
  62.     for (int i = 1 ; i <= MAX_LEVEL; ++i) {
  63.       testCachingAtLevel(i);
  64.     }
  65.   }
  66.   private void testCachingAtLevel(int level) throws IOException {
  67.     String namenode = null;
  68.     MiniDFSCluster dfs = null;
  69.     MiniMRCluster mr = null;
  70.     FileSystem fileSys = null;
  71.     String testName = "TestMultiLevelCaching";
  72.     try {
  73.       final int taskTrackers = 1;
  74.       // generate the racks
  75.       // use rack1 for data node
  76.       String rack1 = getRack(0, level);
  77.       // use rack2 for task tracker
  78.       String rack2 = getRack(1, level);
  79.       Configuration conf = new Configuration();
  80.       // Run a datanode on host1 under /a/b/c/..../d1/e1/f1
  81.       dfs = new MiniDFSCluster(conf, 1, true, new String[] {rack1}, 
  82.                                new String[] {"host1.com"});
  83.       dfs.waitActive();
  84.       fileSys = dfs.getFileSystem();
  85.       if (!fileSys.mkdirs(inDir)) {
  86.         throw new IOException("Mkdirs failed to create " + inDir.toString());
  87.       }
  88.       UtilsForTests.writeFile(dfs.getNameNode(), conf, 
  89.                              new Path(inDir + "/file"), (short)1);
  90.       namenode = (dfs.getFileSystem()).getUri().getHost() + ":" + 
  91.                  (dfs.getFileSystem()).getUri().getPort();
  92.       // Run a job with the (only)tasktracker on host2 under diff topology
  93.       // e.g /a/b/c/..../d2/e2/f2. 
  94.       JobConf jc = new JobConf();
  95.       // cache-level = level (unshared levels) + 1(topmost shared node i.e /a) 
  96.       //               + 1 (for host)
  97.       jc.setInt("mapred.task.cache.levels", level + 2);
  98.       mr = new MiniMRCluster(taskTrackers, namenode, 1, new String[] {rack2}, 
  99.                       new String[] {"host2.com"}, jc);
  100.       /* The job is configured with 1 map for one (non-splittable) file. 
  101.        * Since the datanode is running under different subtree, there is no
  102.        * node-level data locality but there should be topological locality.
  103.        */
  104.       TestRackAwareTaskPlacement.launchJobAndTestCounters(
  105.        testName, mr, fileSys, inDir, outputPath, 1, 1, 0, 0);
  106.       mr.shutdown();
  107.     } finally {
  108.       fileSys.delete(inDir, true);
  109.       fileSys.delete(outputPath, true);
  110.       if (dfs != null) { 
  111.         dfs.shutdown(); 
  112.       }
  113.     }
  114.   }
  115. }