TestFileCreationClient.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:5k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.hdfs;
  19. import org.apache.commons.logging.impl.Log4JLogger;
  20. import org.apache.hadoop.conf.Configuration;
  21. import org.apache.hadoop.fs.FSDataInputStream;
  22. import org.apache.hadoop.fs.FSDataOutputStream;
  23. import org.apache.hadoop.fs.FileSystem;
  24. import org.apache.hadoop.fs.Path;
  25. import org.apache.hadoop.hdfs.server.datanode.DataNode;
  26. import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
  27. import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
  28. import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
  29. import org.apache.hadoop.io.IOUtils;
  30. import org.apache.log4j.Level;
  31. /**
  32.  * This class tests that a file need not be closed before its
  33.  * data can be read by another client.
  34.  */
  35. public class TestFileCreationClient extends junit.framework.TestCase {
  36.   static final String DIR = "/" + TestFileCreationClient.class.getSimpleName() + "/";
  37.   {
  38.     ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
  39.     ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
  40.     ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
  41.     ((Log4JLogger)InterDatanodeProtocol.LOG).getLogger().setLevel(Level.ALL);
  42.   }
  43.   /** Test lease recovery Triggered by DFSClient. */
  44.   public void testClientTriggeredLeaseRecovery() throws Exception {
  45.     final int REPLICATION = 3;
  46.     Configuration conf = new Configuration();
  47.     conf.setInt("dfs.datanode.handler.count", 1);
  48.     conf.setInt("dfs.replication", REPLICATION);
  49.     MiniDFSCluster cluster = new MiniDFSCluster(conf, REPLICATION, true, null);
  50.     try {
  51.       final FileSystem fs = cluster.getFileSystem();
  52.       final Path dir = new Path("/wrwelkj");
  53.       
  54.       SlowWriter[] slowwriters = new SlowWriter[10];
  55.       for(int i = 0; i < slowwriters.length; i++) {
  56.         slowwriters[i] = new SlowWriter(fs, new Path(dir, "file" + i));
  57.       }
  58.       try {
  59.         for(int i = 0; i < slowwriters.length; i++) {
  60.           slowwriters[i].start();
  61.         }
  62.         Thread.sleep(1000);                       // let writers get started
  63.         //stop a datanode, it should have least recover.
  64.         cluster.stopDataNode(AppendTestUtil.nextInt(REPLICATION));
  65.         
  66.         //let the slow writer writes a few more seconds
  67.         System.out.println("Wait a few seconds");
  68.         Thread.sleep(5000);
  69.       }
  70.       finally {
  71.         for(int i = 0; i < slowwriters.length; i++) {
  72.           if (slowwriters[i] != null) {
  73.             slowwriters[i].running = false;
  74.             slowwriters[i].interrupt();
  75.           }
  76.         }
  77.         for(int i = 0; i < slowwriters.length; i++) {
  78.           if (slowwriters[i] != null) {
  79.             slowwriters[i].join();
  80.           }
  81.         }
  82.       }
  83.       //Verify the file
  84.       System.out.println("Verify the file");
  85.       for(int i = 0; i < slowwriters.length; i++) {
  86.         System.out.println(slowwriters[i].filepath + ": length="
  87.             + fs.getFileStatus(slowwriters[i].filepath).getLen());
  88.         FSDataInputStream in = null;
  89.         try {
  90.           in = fs.open(slowwriters[i].filepath);
  91.           for(int j = 0, x; (x = in.read()) != -1; j++) {
  92.             assertEquals(j, x);
  93.           }
  94.         }
  95.         finally {
  96.           IOUtils.closeStream(in);
  97.         }
  98.       }
  99.     } finally {
  100.       if (cluster != null) {cluster.shutdown();}
  101.     }
  102.   }
  103.   static class SlowWriter extends Thread {
  104.     final FileSystem fs;
  105.     final Path filepath;
  106.     boolean running = true;
  107.     
  108.     SlowWriter(FileSystem fs, Path filepath) {
  109.       super(SlowWriter.class.getSimpleName() + ":" + filepath);
  110.       this.fs = fs;
  111.       this.filepath = filepath;
  112.     }
  113.     public void run() {
  114.       FSDataOutputStream out = null;
  115.       int i = 0;
  116.       try {
  117.         out = fs.create(filepath);
  118.         for(; running; i++) {
  119.           System.out.println(getName() + " writes " + i);
  120.           out.write(i);
  121.           out.sync();
  122.           sleep(100);
  123.         }
  124.       }
  125.       catch(Exception e) {
  126.         System.out.println(getName() + " dies: e=" + e);
  127.       }
  128.       finally {
  129.         System.out.println(getName() + ": i=" + i);
  130.         IOUtils.closeStream(out);
  131.       }
  132.     }        
  133.   }
  134. }