TestIndexCache.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:6k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.DataOutputStream;
  20. import java.io.FileNotFoundException;
  21. import java.io.IOException;
  22. import java.util.Random;
  23. import java.util.zip.CRC32;
  24. import java.util.zip.CheckedOutputStream;
  25. import org.apache.hadoop.fs.ChecksumException;
  26. import org.apache.hadoop.fs.FileStatus;
  27. import org.apache.hadoop.fs.FileSystem;
  28. import org.apache.hadoop.fs.Path;
  29. import org.apache.hadoop.fs.FSDataOutputStream;
  30. import junit.framework.TestCase;
  31. public class TestIndexCache extends TestCase {
  32.   public void testLRCPolicy() throws Exception {
  33.     Random r = new Random();
  34.     long seed = r.nextLong();
  35.     r.setSeed(seed);
  36.     System.out.println("seed: " + seed);
  37.     JobConf conf = new JobConf();
  38.     FileSystem fs = FileSystem.getLocal(conf).getRaw();
  39.     Path p = new Path(System.getProperty("test.build.data", "/tmp"),
  40.         "cache").makeQualified(fs);
  41.     fs.delete(p, true);
  42.     conf.setInt("mapred.tasktracker.indexcache.mb", 1);
  43.     final int partsPerMap = 1000;
  44.     final int bytesPerFile = partsPerMap * 24;
  45.     IndexCache cache = new IndexCache(conf);
  46.     // fill cache
  47.     int totalsize = bytesPerFile;
  48.     for (; totalsize < 1024 * 1024; totalsize += bytesPerFile) {
  49.       Path f = new Path(p, Integer.toString(totalsize, 36));
  50.       writeFile(fs, f, totalsize, partsPerMap);
  51.       IndexRecord rec = cache.getIndexInformation(
  52.           Integer.toString(totalsize, 36), r.nextInt(partsPerMap), f);
  53.       checkRecord(rec, totalsize);
  54.     }
  55.     // delete files, ensure cache retains all elem
  56.     for (FileStatus stat : fs.listStatus(p)) {
  57.       fs.delete(stat.getPath(),true);
  58.     }
  59.     for (int i = bytesPerFile; i < 1024 * 1024; i += bytesPerFile) {
  60.       Path f = new Path(p, Integer.toString(i, 36));
  61.       IndexRecord rec = cache.getIndexInformation(Integer.toString(i, 36),
  62.           r.nextInt(partsPerMap), f);
  63.       checkRecord(rec, i);
  64.     }
  65.     // push oldest (bytesPerFile) out of cache
  66.     Path f = new Path(p, Integer.toString(totalsize, 36));
  67.     writeFile(fs, f, totalsize, partsPerMap);
  68.     cache.getIndexInformation(Integer.toString(totalsize, 36),
  69.         r.nextInt(partsPerMap), f);
  70.     fs.delete(f, false);
  71.     // oldest fails to read, or error
  72.     boolean fnf = false;
  73.     try {
  74.       cache.getIndexInformation(Integer.toString(bytesPerFile, 36),
  75.           r.nextInt(partsPerMap), new Path(p, Integer.toString(bytesPerFile)));
  76.     } catch (IOException e) {
  77.       if (e.getCause() == null ||
  78.           !(e.getCause()  instanceof FileNotFoundException)) {
  79.         throw e;
  80.       }
  81.       else {
  82.         fnf = true;
  83.       }
  84.     }
  85.     if (!fnf)
  86.       fail("Failed to push out last entry");
  87.     // should find all the other entries
  88.     for (int i = bytesPerFile << 1; i < 1024 * 1024; i += bytesPerFile) {
  89.       IndexRecord rec = cache.getIndexInformation(Integer.toString(i, 36),
  90.           r.nextInt(partsPerMap), new Path(p, Integer.toString(i, 36)));
  91.       checkRecord(rec, i);
  92.     }
  93.     IndexRecord rec = cache.getIndexInformation(Integer.toString(totalsize, 36),
  94.         r.nextInt(partsPerMap), f);
  95.     checkRecord(rec, totalsize);
  96.   }
  97.   public void testBadIndex() throws Exception {
  98.     final int parts = 30;
  99.     JobConf conf = new JobConf();
  100.     FileSystem fs = FileSystem.getLocal(conf).getRaw();
  101.     Path p = new Path(System.getProperty("test.build.data", "/tmp"),
  102.         "cache").makeQualified(fs);
  103.     fs.delete(p, true);
  104.     conf.setInt("mapred.tasktracker.indexcache.mb", 1);
  105.     IndexCache cache = new IndexCache(conf);
  106.     Path f = new Path(p, "badindex");
  107.     FSDataOutputStream out = fs.create(f, false);
  108.     CheckedOutputStream iout = new CheckedOutputStream(out, new CRC32());
  109.     DataOutputStream dout = new DataOutputStream(iout);
  110.     for (int i = 0; i < parts; ++i) {
  111.       for (int j = 0; j < MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH / 8; ++j) {
  112.         if (0 == (i % 3)) {
  113.           dout.writeLong(i);
  114.         } else {
  115.           out.writeLong(i);
  116.         }
  117.       }
  118.     }
  119.     out.writeLong(iout.getChecksum().getValue());
  120.     dout.close();
  121.     try {
  122.       cache.getIndexInformation("badindex", 7, f);
  123.       fail("Did not detect bad checksum");
  124.     } catch (IOException e) {
  125.       if (!(e.getCause() instanceof ChecksumException)) {
  126.         throw e;
  127.       }
  128.     }
  129.   }
  130.   private static void checkRecord(IndexRecord rec, long fill) {
  131.     assertEquals(fill, rec.startOffset);
  132.     assertEquals(fill, rec.rawLength);
  133.     assertEquals(fill, rec.partLength);
  134.   }
  135.   private static void writeFile(FileSystem fs, Path f, long fill, int parts)
  136.       throws IOException {
  137.     FSDataOutputStream out = fs.create(f, false);
  138.     CheckedOutputStream iout = new CheckedOutputStream(out, new CRC32());
  139.     DataOutputStream dout = new DataOutputStream(iout);
  140.     for (int i = 0; i < parts; ++i) {
  141.       for (int j = 0; j < MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH / 8; ++j) {
  142.         dout.writeLong(fill);
  143.       }
  144.     }
  145.     out.writeLong(iout.getChecksum().getValue());
  146.     dout.close();
  147.   }
  148. }