TestHarFileSystem.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:8k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.fs;
  19. import java.io.IOException;
  20. import java.util.Iterator;
  21. import org.apache.hadoop.conf.Configuration;
  22. import org.apache.hadoop.hdfs.MiniDFSCluster;
  23. import org.apache.hadoop.fs.FSDataOutputStream;
  24. import org.apache.hadoop.fs.FileSystem;
  25. import org.apache.hadoop.fs.FsShell;
  26. import org.apache.hadoop.fs.Path;
  27. import org.apache.hadoop.io.LongWritable;
  28. import org.apache.hadoop.io.Text;
  29. import org.apache.hadoop.mapred.FileInputFormat;
  30. import org.apache.hadoop.mapred.FileOutputFormat;
  31. import org.apache.hadoop.mapred.JobClient;
  32. import org.apache.hadoop.mapred.JobConf;
  33. import org.apache.hadoop.mapred.Mapper;
  34. import org.apache.hadoop.mapred.MiniMRCluster;
  35. import org.apache.hadoop.mapred.OutputCollector;
  36. import org.apache.hadoop.mapred.Reducer;
  37. import org.apache.hadoop.mapred.Reporter;
  38. import org.apache.hadoop.mapred.TextInputFormat;
  39. import org.apache.hadoop.mapred.TextOutputFormat;
  40. import org.apache.hadoop.tools.HadoopArchives;
  41. import org.apache.hadoop.util.ToolRunner;
  42. import junit.framework.TestCase;
  43. /**
  44.  * test the har file system
  45.  * create a har filesystem
  46.  * run fs commands
  47.  * and then run a map reduce job
  48.  */
  49. public class TestHarFileSystem extends TestCase {
  50.   private Path inputPath;
  51.   private MiniDFSCluster dfscluster;
  52.   private MiniMRCluster mapred;
  53.   private FileSystem fs;
  54.   private Path filea, fileb, filec;
  55.   private Path archivePath;
  56.   
  57.   protected void setUp() throws Exception {
  58.     super.setUp();
  59.     dfscluster = new MiniDFSCluster(new JobConf(), 2, true, null);
  60.     fs = dfscluster.getFileSystem();
  61.     mapred = new MiniMRCluster(2, fs.getUri().toString(), 1);
  62.     inputPath = new Path(fs.getHomeDirectory(), "test"); 
  63.     filea = new Path(inputPath,"a");
  64.     fileb = new Path(inputPath,"b");
  65.     filec = new Path(inputPath,"c");
  66.     archivePath = new Path(fs.getHomeDirectory(), "tmp");
  67.   }
  68.   
  69.   protected void tearDown() throws Exception {
  70.     try {
  71.       if (mapred != null) {
  72.         mapred.shutdown();
  73.       }
  74.       if (dfscluster != null) {
  75.         dfscluster.shutdown();
  76.       }
  77.     } catch(Exception e) {
  78.       System.err.println(e);
  79.     }
  80.     super.tearDown();
  81.   }
  82.   
  83.   static class TextMapperReducer implements Mapper<LongWritable, Text, Text, Text>, 
  84.             Reducer<Text, Text, Text, Text> {
  85.     
  86.     public void configure(JobConf conf) {
  87.       //do nothing 
  88.     }
  89.     public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
  90.       output.collect(value, new Text(""));
  91.     }
  92.     public void close() throws IOException {
  93.       // do nothing
  94.     }
  95.     public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
  96.       while(values.hasNext()) { 
  97.         values.next();
  98.         output.collect(key, null);
  99.       }
  100.     }
  101.   }
  102.   
  103.   public void testArchives() throws Exception {
  104.     fs.mkdirs(inputPath);
  105.     
  106.     FSDataOutputStream out = fs.create(filea); 
  107.     out.write("a".getBytes());
  108.     out.close();
  109.     out = fs.create(fileb);
  110.     out.write("b".getBytes());
  111.     out.close();
  112.     out = fs.create(filec);
  113.     out.write("c".getBytes());
  114.     out.close();
  115.     Configuration conf = mapred.createJobConf();
  116.     HadoopArchives har = new HadoopArchives(conf);
  117.     String[] args = new String[3];
  118.     //check for destination not specfied
  119.     args[0] = "-archiveName";
  120.     args[1] = "foo.har";
  121.     args[2] = inputPath.toString();
  122.     int ret = ToolRunner.run(har, args);
  123.     assertTrue(ret != 0);
  124.     args = new String[4];
  125.     //check for wrong archiveName
  126.     args[0] = "-archiveName";
  127.     args[1] = "/d/foo.har";
  128.     args[2] = inputPath.toString();
  129.     args[3] = archivePath.toString();
  130.     ret = ToolRunner.run(har, args);
  131.     assertTrue(ret != 0);
  132. //  se if dest is a file 
  133.     args[1] = "foo.har";
  134.     args[3] = filec.toString();
  135.     ret = ToolRunner.run(har, args);
  136.     assertTrue(ret != 0);
  137.     //this is a valid run
  138.     args[0] = "-archiveName";
  139.     args[1] = "foo.har";
  140.     args[2] = inputPath.toString();
  141.     args[3] = archivePath.toString();
  142.     ret = ToolRunner.run(har, args);
  143.     //checl for the existenece of the archive
  144.     assertTrue(ret == 0);
  145.     ///try running it again. it should not 
  146.     // override the directory
  147.     ret = ToolRunner.run(har, args);
  148.     assertTrue(ret != 0);
  149.     Path finalPath = new Path(archivePath, "foo.har");
  150.     Path fsPath = new Path(inputPath.toUri().getPath());
  151.     String relative = fsPath.toString().substring(1);
  152.     Path filePath = new Path(finalPath, relative);
  153.     //make it a har path 
  154.     Path harPath = new Path("har://" + filePath.toUri().getPath());
  155.     assertTrue(fs.exists(new Path(finalPath, "_index")));
  156.     assertTrue(fs.exists(new Path(finalPath, "_masterindex")));
  157.     assertTrue(!fs.exists(new Path(finalPath, "_logs")));
  158.     //creation tested
  159.     //check if the archive is same
  160.     // do ls and cat on all the files
  161.     FsShell shell = new FsShell(conf);
  162.     args = new String[2];
  163.     args[0] = "-ls";
  164.     args[1] = harPath.toString();
  165.     ret = ToolRunner.run(shell, args);
  166.     // ls should work.
  167.     assertTrue((ret == 0));
  168.     //now check for contents of filea
  169.     // fileb and filec
  170.     Path harFilea = new Path(harPath, "a");
  171.     Path harFileb = new Path(harPath, "b");
  172.     Path harFilec = new Path(harPath, "c");
  173.     FileSystem harFs = harFilea.getFileSystem(conf);
  174.     FSDataInputStream fin = harFs.open(harFilea);
  175.     byte[] b = new byte[4];
  176.     int readBytes = fin.read(b);
  177.     fin.close();
  178.     assertTrue("strings are equal ", (b[0] == "a".getBytes()[0]));
  179.     fin = harFs.open(harFileb);
  180.     fin.read(b);
  181.     fin.close();
  182.     assertTrue("strings are equal ", (b[0] == "b".getBytes()[0]));
  183.     fin = harFs.open(harFilec);
  184.     fin.read(b);
  185.     fin.close();
  186.     assertTrue("strings are equal ", (b[0] == "c".getBytes()[0]));
  187.     // ok all files match 
  188.     // run a map reduce job
  189.     Path outdir = new Path(fs.getHomeDirectory(), "mapout"); 
  190.     JobConf jobconf = mapred.createJobConf();
  191.     FileInputFormat.addInputPath(jobconf, harPath);
  192.     jobconf.setInputFormat(TextInputFormat.class);
  193.     jobconf.setOutputFormat(TextOutputFormat.class);
  194.     FileOutputFormat.setOutputPath(jobconf, outdir);
  195.     jobconf.setMapperClass(TextMapperReducer.class);
  196.     jobconf.setMapOutputKeyClass(Text.class);
  197.     jobconf.setMapOutputValueClass(Text.class);
  198.     jobconf.setReducerClass(TextMapperReducer.class);
  199.     jobconf.setNumReduceTasks(1);
  200.     JobClient.runJob(jobconf);
  201.     args[1] = outdir.toString();
  202.     ret = ToolRunner.run(shell, args);
  203.     
  204.     FileStatus[] status = fs.globStatus(new Path(outdir, "part*"));
  205.     Path reduceFile = status[0].getPath();
  206.     FSDataInputStream reduceIn = fs.open(reduceFile);
  207.     b = new byte[6];
  208.     reduceIn.read(b);
  209.     //assuming all the 6 bytes were read.
  210.     Text readTxt = new Text(b);
  211.     assertTrue("anbncn".equals(readTxt.toString()));
  212.     assertTrue("number of bytes left should be -1", reduceIn.read(b) == -1);
  213.     reduceIn.close();
  214.   }
  215. }