DataBlockScanner.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:30k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.hdfs.server.datanode;
  19. import java.io.BufferedReader;
  20. import java.io.Closeable;
  21. import java.io.DataOutputStream;
  22. import java.io.File;
  23. import java.io.FileNotFoundException;
  24. import java.io.FileOutputStream;
  25. import java.io.FileReader;
  26. import java.io.IOException;
  27. import java.io.PrintStream;
  28. import java.text.DateFormat;
  29. import java.text.SimpleDateFormat;
  30. import java.util.Arrays;
  31. import java.util.Collections;
  32. import java.util.Date;
  33. import java.util.HashMap;
  34. import java.util.Iterator;
  35. import java.util.Random;
  36. import java.util.TreeSet;
  37. import java.util.regex.Matcher;
  38. import java.util.regex.Pattern;
  39. import javax.servlet.http.HttpServlet;
  40. import javax.servlet.http.HttpServletRequest;
  41. import javax.servlet.http.HttpServletResponse;
  42. import org.apache.commons.logging.Log;
  43. import org.apache.commons.logging.LogFactory;
  44. import org.apache.hadoop.conf.Configuration;
  45. import org.apache.hadoop.hdfs.protocol.Block;
  46. import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
  47. import org.apache.hadoop.hdfs.protocol.LocatedBlock;
  48. import org.apache.hadoop.io.IOUtils;
  49. import org.apache.hadoop.util.StringUtils;
  50. /*
  51.  * This keeps track of blocks and their last verification times.
  52.  * Currently it does not modify the metadata for block.
  53.  */
  54. class DataBlockScanner implements Runnable {
  55.   
  56.   public static final Log LOG = LogFactory.getLog(DataBlockScanner.class);
  57.   
  58.   private static final int MAX_SCAN_RATE = 8 * 1024 * 1024; // 8MB per sec
  59.   private static final int MIN_SCAN_RATE = 1 * 1024 * 1024; // 1MB per sec
  60.   
  61.   static final long DEFAULT_SCAN_PERIOD_HOURS = 21*24L; // three weeks
  62.   private static final long ONE_DAY = 24*3600*1000L;
  63.   
  64.   static final DateFormat dateFormat = 
  65.                     new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
  66.   
  67.   static final String verificationLogFile = "dncp_block_verification.log";
  68.   static final int verficationLogLimit = 5; // * numBlocks.
  69.   private long scanPeriod = DEFAULT_SCAN_PERIOD_HOURS * 3600 * 1000;
  70.   DataNode datanode;
  71.   FSDataset dataset;
  72.   
  73.   // sorted set
  74.   TreeSet<BlockScanInfo> blockInfoSet;
  75.   HashMap<Block, BlockScanInfo> blockMap;
  76.   
  77.   long totalScans = 0;
  78.   long totalVerifications = 0; // includes remote verification by clients.
  79.   long totalScanErrors = 0;
  80.   long totalTransientErrors = 0;
  81.   
  82.   long currentPeriodStart = System.currentTimeMillis();
  83.   long bytesLeft = 0; // Bytes to scan in this period
  84.   long totalBytesToScan = 0;
  85.   
  86.   private LogFileHandler verificationLog;
  87.   
  88.   Random random = new Random();
  89.   
  90.   BlockTransferThrottler throttler = null;
  91.   
  92.   private static enum ScanType {
  93.     REMOTE_READ,           // Verified when a block read by a client etc
  94.     VERIFICATION_SCAN,     // scanned as part of periodic verfication
  95.     NONE,
  96.   }
  97.   
  98.   static class BlockScanInfo implements Comparable<BlockScanInfo> {
  99.     Block block;
  100.     long lastScanTime = 0;
  101.     long lastLogTime = 0;
  102.     ScanType lastScanType = ScanType.NONE; 
  103.     boolean lastScanOk = true;
  104.     
  105.     BlockScanInfo(Block block) {
  106.       this.block = block;
  107.     }
  108.     
  109.     public int hashCode() {
  110.       return block.hashCode();
  111.     }
  112.     
  113.     public boolean equals(Object other) {
  114.       return other instanceof BlockScanInfo &&
  115.              compareTo((BlockScanInfo)other) == 0;
  116.     }
  117.     
  118.     long getLastScanTime() {
  119.       return ( lastScanType == ScanType.NONE) ? 0 : lastScanTime;
  120.     }
  121.     
  122.     public int compareTo(BlockScanInfo other) {
  123.       long t1 = lastScanTime;
  124.       long t2 = other.lastScanTime;
  125.       return ( t1 < t2 ) ? -1 : 
  126.                           (( t1 > t2 ) ? 1 : block.compareTo(other.block)); 
  127.     }
  128.   }
  129.   
  130.   DataBlockScanner(DataNode datanode, FSDataset dataset, Configuration conf) {
  131.     this.datanode = datanode;
  132.     this.dataset = dataset;
  133.     scanPeriod = conf.getInt("dfs.datanode.scan.period.hours", 0);
  134.     if ( scanPeriod <= 0 ) {
  135.       scanPeriod = DEFAULT_SCAN_PERIOD_HOURS;
  136.     }
  137.     scanPeriod *= 3600 * 1000;
  138.     // initialized when the scanner thread is started.
  139.   }
  140.   
  141.   private synchronized boolean isInitiliazed() {
  142.     return throttler != null;
  143.   }
  144.   
  145.   private void updateBytesToScan(long len, long lastScanTime) {
  146.     // len could be negative when a block is deleted.
  147.     totalBytesToScan += len;
  148.     if ( lastScanTime < currentPeriodStart ) {
  149.       bytesLeft += len;
  150.     }
  151.     // Should we change throttler bandwidth every time bytesLeft changes?
  152.     // not really required.
  153.   }
  154.   
  155.   private synchronized void addBlockInfo(BlockScanInfo info) {
  156.     boolean added = blockInfoSet.add(info);
  157.     blockMap.put(info.block, info);
  158.     
  159.     if ( added ) {
  160.       LogFileHandler log = verificationLog;
  161.       if (log != null) {
  162.         log.setMaxNumLines(blockMap.size() * verficationLogLimit);
  163.       }
  164.       updateBytesToScan(info.block.getNumBytes(), info.lastScanTime);
  165.     }
  166.   }
  167.   
  168.   private synchronized void delBlockInfo(BlockScanInfo info) {
  169.     boolean exists = blockInfoSet.remove(info);
  170.     blockMap.remove(info.block);
  171.     if ( exists ) {
  172.       LogFileHandler log = verificationLog;
  173.       if (log != null) {
  174.         log.setMaxNumLines(blockMap.size() * verficationLogLimit);
  175.       }
  176.       updateBytesToScan(-info.block.getNumBytes(), info.lastScanTime);
  177.     }
  178.   }
  179.   
  180.   /** Update blockMap by the given LogEntry */
  181.   private synchronized void updateBlockInfo(LogEntry e) {
  182.     BlockScanInfo info = blockMap.get(new Block(e.blockId, 0, e.genStamp));
  183.     
  184.     if(info != null && e.verificationTime > 0 && 
  185.         info.lastScanTime < e.verificationTime) {
  186.       delBlockInfo(info);
  187.       info.lastScanTime = e.verificationTime;
  188.       info.lastScanType = ScanType.VERIFICATION_SCAN;
  189.       addBlockInfo(info);
  190.     }
  191.   }
  192.   private void init() {
  193.     
  194.     // get the list of blocks and arrange them in random order
  195.     Block arr[] = dataset.getBlockReport();
  196.     Collections.shuffle(Arrays.asList(arr));
  197.     
  198.     blockInfoSet = new TreeSet<BlockScanInfo>();
  199.     blockMap = new HashMap<Block, BlockScanInfo>();
  200.     
  201.     long scanTime = -1;
  202.     for (Block block : arr) {
  203.       BlockScanInfo info = new BlockScanInfo( block );
  204.       info.lastScanTime = scanTime--; 
  205.       //still keep 'info.lastScanType' to NONE.
  206.       addBlockInfo(info);
  207.     }
  208.     /* Pick the first directory that has any existing scanner log.
  209.      * otherwise, pick the first directory.
  210.      */
  211.     File dir = null;
  212.     FSDataset.FSVolume[] volumes = dataset.volumes.volumes;
  213.     for(FSDataset.FSVolume vol : volumes) {
  214.       if (LogFileHandler.isFilePresent(vol.getDir(), verificationLogFile)) {
  215.         dir = vol.getDir();
  216.         break;
  217.       }
  218.     }
  219.     if (dir == null) {
  220.       dir = volumes[0].getDir();
  221.     }
  222.     
  223.     try {
  224.       // max lines will be updated later during initialization.
  225.       verificationLog = new LogFileHandler(dir, verificationLogFile, 100);
  226.     } catch (IOException e) {
  227.       LOG.warn("Could not open verfication log. " +
  228.                "Verification times are not stored.");
  229.     }
  230.     
  231.     synchronized (this) {
  232.       throttler = new BlockTransferThrottler(200, MAX_SCAN_RATE);
  233.     }
  234.   }
  235.   private synchronized long getNewBlockScanTime() {
  236.     /* If there are a lot of blocks, this returns a random time with in 
  237.      * the scan period. Otherwise something sooner.
  238.      */
  239.     long period = Math.min(scanPeriod, 
  240.                            Math.max(blockMap.size(),1) * 600 * 1000L);
  241.     return System.currentTimeMillis() - scanPeriod + 
  242.            random.nextInt((int)period);    
  243.   }
  244.   /** Adds block to list of blocks */
  245.   synchronized void addBlock(Block block) {
  246.     if (!isInitiliazed()) {
  247.       return;
  248.     }
  249.     
  250.     BlockScanInfo info = blockMap.get(block);
  251.     if ( info != null ) {
  252.       LOG.warn("Adding an already existing block " + block);
  253.       delBlockInfo(info);
  254.     }
  255.     
  256.     info = new BlockScanInfo(block);    
  257.     info.lastScanTime = getNewBlockScanTime();
  258.     
  259.     addBlockInfo(info);
  260.     adjustThrottler();
  261.   }
  262.   
  263.   /** Deletes the block from internal structures */
  264.   synchronized void deleteBlock(Block block) {
  265.     if (!isInitiliazed()) {
  266.       return;
  267.     }
  268.     BlockScanInfo info = blockMap.get(block);
  269.     if ( info != null ) {
  270.       delBlockInfo(info);
  271.     }
  272.   }
  273.   /** @return the last scan time */
  274.   synchronized long getLastScanTime(Block block) {
  275.     if (!isInitiliazed()) {
  276.       return 0;
  277.     }
  278.     BlockScanInfo info = blockMap.get(block);
  279.     return info == null? 0: info.lastScanTime;
  280.   }
  281.   /** Deletes blocks from internal structures */
  282.   void deleteBlocks(Block[] blocks) {
  283.     for ( Block b : blocks ) {
  284.       deleteBlock(b);
  285.     }
  286.   }
  287.   
  288.   void verifiedByClient(Block block) {
  289.     updateScanStatus(block, ScanType.REMOTE_READ, true);
  290.   }
  291.   
  292.   private synchronized void updateScanStatus(Block block, 
  293.                                              ScanType type,
  294.                                              boolean scanOk) {
  295.     BlockScanInfo info = blockMap.get(block);
  296.     
  297.     if ( info != null ) {
  298.       delBlockInfo(info);
  299.     } else {
  300.       // It might already be removed. Thats ok, it will be caught next time.
  301.       info = new BlockScanInfo(block);
  302.     }
  303.     
  304.     long now = System.currentTimeMillis();
  305.     info.lastScanType = type;
  306.     info.lastScanTime = now;
  307.     info.lastScanOk = scanOk;
  308.     addBlockInfo(info);
  309.     
  310.     if (type == ScanType.REMOTE_READ) {
  311.       totalVerifications++;
  312.     }
  313.         
  314.     // Don't update meta data too often in case of REMOTE_READ
  315.     // of if the verification failed.
  316.     long diff = now - info.lastLogTime;
  317.     if (!scanOk || (type == ScanType.REMOTE_READ &&
  318.                     diff < scanPeriod/3 && diff < ONE_DAY)) {
  319.       return;
  320.     }
  321.     
  322.     info.lastLogTime = now;
  323.     LogFileHandler log = verificationLog;
  324.     if (log != null) {
  325.       log.appendLine(LogEntry.newEnry(block, now));
  326.     }
  327.   }
  328.   
  329.   private void handleScanFailure(Block block) {
  330.     
  331.     LOG.info("Reporting bad block " + block + " to namenode.");
  332.     
  333.     try {
  334.       DatanodeInfo[] dnArr = { new DatanodeInfo(datanode.dnRegistration) };
  335.       LocatedBlock[] blocks = { new LocatedBlock(block, dnArr) }; 
  336.       datanode.namenode.reportBadBlocks(blocks);
  337.     } catch (IOException e){
  338.       /* One common reason is that NameNode could be in safe mode.
  339.        * Should we keep on retrying in that case?
  340.        */
  341.       LOG.warn("Failed to report bad block " + block + " to namenode : " +
  342.                " Exception : " + StringUtils.stringifyException(e));
  343.     }
  344.   }
  345.   
  346.   static private class LogEntry {
  347.     long blockId = -1;
  348.     long verificationTime = -1;
  349.     long genStamp = Block.GRANDFATHER_GENERATION_STAMP;
  350.     
  351.     /**
  352.      * The format consists of single line with multiple entries. each 
  353.      * entry is in the form : name="value".
  354.      * This simple text and easily extendable and easily parseable with a
  355.      * regex.
  356.      */
  357.     private static Pattern entryPattern = 
  358.       Pattern.compile("\G\s*([^=\p{Space}]+)="(.*?)"\s*");
  359.     
  360.     static String newEnry(Block block, long time) {
  361.       return "date="" + dateFormat.format(new Date(time)) + ""t " +
  362.              "time="" + time + ""t " +
  363.              "genstamp="" + block.getGenerationStamp() + ""t " +
  364.              "id="" + block.getBlockId() +""";
  365.     }
  366.     
  367.     static LogEntry parseEntry(String line) {
  368.       LogEntry entry = new LogEntry();
  369.       
  370.       Matcher matcher = entryPattern.matcher(line);
  371.       while (matcher.find()) {
  372.         String name = matcher.group(1);
  373.         String value = matcher.group(2);
  374.         
  375.         try {
  376.           if (name.equals("id")) {
  377.             entry.blockId = Long.valueOf(value);
  378.           } else if (name.equals("time")) {
  379.             entry.verificationTime = Long.valueOf(value);
  380.           } else if (name.equals("genstamp")) {
  381.             entry.genStamp = Long.valueOf(value);
  382.           }
  383.         } catch(NumberFormatException nfe) {
  384.           LOG.warn("Cannot parse line: " + line, nfe);
  385.           return null;
  386.         }
  387.       }
  388.       
  389.       return entry;
  390.     }
  391.   }
  392.   
  393.   private synchronized void adjustThrottler() {
  394.     long timeLeft = currentPeriodStart+scanPeriod - System.currentTimeMillis();
  395.     long bw = Math.max(bytesLeft*1000/timeLeft, MIN_SCAN_RATE);
  396.     throttler.setBandwidth(Math.min(bw, MAX_SCAN_RATE));
  397.   }
  398.   
  399.   private void verifyBlock(Block block) {
  400.     
  401.     BlockSender blockSender = null;
  402.     /* In case of failure, attempt to read second time to reduce
  403.      * transient errors. How do we flush block data from kernel 
  404.      * buffers before the second read? 
  405.      */
  406.     for (int i=0; i<2; i++) {
  407.       boolean second = (i > 0);
  408.       
  409.       try {
  410.         adjustThrottler();
  411.         
  412.         blockSender = new BlockSender(block, 0, -1, false, 
  413.                                                false, true, datanode);
  414.         DataOutputStream out = 
  415.                 new DataOutputStream(new IOUtils.NullOutputStream());
  416.         
  417.         blockSender.sendBlock(out, null, throttler);
  418.         LOG.info((second ? "Second " : "") +
  419.                  "Verification succeeded for " + block);
  420.         
  421.         if ( second ) {
  422.           totalTransientErrors++;
  423.         }
  424.         
  425.         updateScanStatus(block, ScanType.VERIFICATION_SCAN, true);
  426.         return;
  427.       } catch (IOException e) {
  428.         totalScanErrors++;
  429.         updateScanStatus(block, ScanType.VERIFICATION_SCAN, false);
  430.         // If the block does not exists anymore, then its not an error
  431.         if ( dataset.getFile(block) == null ) {
  432.           LOG.info("Verification failed for " + block + ". Its ok since " +
  433.           "it not in datanode dataset anymore.");
  434.           deleteBlock(block);
  435.           return;
  436.         }
  437.         LOG.warn((second ? "Second " : "First ") + 
  438.                  "Verification failed for " + block + ". Exception : " +
  439.                  StringUtils.stringifyException(e));
  440.         
  441.         if (second) {
  442.           datanode.getMetrics().blockVerificationFailures.inc(); 
  443.           handleScanFailure(block);
  444.           return;
  445.         } 
  446.       } finally {
  447.         IOUtils.closeStream(blockSender);
  448.         datanode.getMetrics().blocksVerified.inc();
  449.         totalScans++;
  450.         totalVerifications++;
  451.       }
  452.     }
  453.   }
  454.   
  455.   private synchronized long getEarliestScanTime() {
  456.     if ( blockInfoSet.size() > 0 ) {
  457.       return blockInfoSet.first().lastScanTime;
  458.     }
  459.     return Long.MAX_VALUE; 
  460.   }
  461.   
  462.   // Picks one block and verifies it
  463.   private void verifyFirstBlock() {
  464.     Block block = null;
  465.     synchronized (this) {
  466.       if ( blockInfoSet.size() > 0 ) {
  467.         block = blockInfoSet.first().block;
  468.       }
  469.     }
  470.     
  471.     if ( block != null ) {
  472.       verifyBlock(block);
  473.     }
  474.   }
  475.   
  476.   /** returns false if the process was interrupted
  477.    * because the thread is marked to exit.
  478.    */
  479.   private boolean assignInitialVerificationTimes() {
  480.     int numBlocks = 1;
  481.     synchronized (this) {
  482.       numBlocks = Math.max(blockMap.size(), 1);
  483.     }
  484.     
  485.     //First udpates the last verification times from the log file.
  486.     LogFileHandler.Reader logReader = null;
  487.     try {
  488.       if (verificationLog != null) {
  489.         logReader = verificationLog.new Reader(false);
  490.       }
  491.     } catch (IOException e) {
  492.       LOG.warn("Could not read previous verification times : " +
  493.                StringUtils.stringifyException(e));
  494.     }
  495.     
  496.     if (verificationLog != null) {
  497.       verificationLog.updateCurNumLines();
  498.     }
  499.     
  500.     try {
  501.     // update verification times from the verificationLog.
  502.     while (logReader != null && logReader.hasNext()) {
  503.       if (!datanode.shouldRun || Thread.interrupted()) {
  504.         return false;
  505.       }
  506.       LogEntry entry = LogEntry.parseEntry(logReader.next());
  507.       if (entry != null) {
  508.         updateBlockInfo(entry);
  509.       }
  510.     }
  511.     } finally {
  512.       IOUtils.closeStream(logReader);
  513.     }
  514.     
  515.     /* Initially spread the block reads over half of 
  516.      * MIN_SCAN_PERIOD so that we don't keep scanning the 
  517.      * blocks too quickly when restarted.
  518.      */
  519.     long verifyInterval = (long) (Math.min( scanPeriod/2.0/numBlocks,
  520.                                             10*60*1000 ));
  521.     long lastScanTime = System.currentTimeMillis() - scanPeriod;
  522.     
  523.     /* Before this loop, entries in blockInfoSet that are not
  524.      * updated above have lastScanTime of <= 0 . Loop until first entry has
  525.      * lastModificationTime > 0.
  526.      */    
  527.     synchronized (this) {
  528.       if (blockInfoSet.size() > 0 ) {
  529.         BlockScanInfo info;
  530.         while ((info =  blockInfoSet.first()).lastScanTime < 0) {
  531.           delBlockInfo(info);        
  532.           info.lastScanTime = lastScanTime;
  533.           lastScanTime += verifyInterval;
  534.           addBlockInfo(info);
  535.         }
  536.       }
  537.     }
  538.     
  539.     return true;
  540.   }
  541.   
  542.   private synchronized void startNewPeriod() {
  543.     LOG.info("Starting a new period : work left in prev period : " +
  544.              String.format("%.2f%%", (bytesLeft * 100.0)/totalBytesToScan));
  545.     // reset the byte counts :
  546.     bytesLeft = totalBytesToScan;
  547.     currentPeriodStart = System.currentTimeMillis();
  548.   }
  549.   
  550.   public void run() {
  551.     try {
  552.       
  553.       init();
  554.       
  555.       //Read last verification times
  556.       if (!assignInitialVerificationTimes()) {
  557.         return;
  558.       }
  559.       
  560.       adjustThrottler();
  561.       
  562.       while (datanode.shouldRun && !Thread.interrupted()) {
  563.         long now = System.currentTimeMillis();
  564.         synchronized (this) {
  565.           if ( now >= (currentPeriodStart + scanPeriod)) {
  566.             startNewPeriod();
  567.           }
  568.         }
  569.         if ( (now - getEarliestScanTime()) >= scanPeriod ) {
  570.           verifyFirstBlock();
  571.         } else {
  572.           try {
  573.             Thread.sleep(1000);
  574.           } catch (InterruptedException ignored) {}
  575.         }
  576.       }
  577.     } catch (RuntimeException e) {
  578.       LOG.warn("RuntimeException during DataBlockScanner.run() : " +
  579.                StringUtils.stringifyException(e));
  580.       throw e;
  581.     } finally {
  582.       shutdown();
  583.       LOG.info("Exiting DataBlockScanner thread.");
  584.     }
  585.   }
  586.   
  587.   synchronized void shutdown() {
  588.     LogFileHandler log = verificationLog;
  589.     verificationLog = null;
  590.     if (log != null) {
  591.       log.close();
  592.     }
  593.   }
  594.   
  595.   synchronized void printBlockReport(StringBuilder buffer, 
  596.                                      boolean summaryOnly) {
  597.     long oneHour = 3600*1000;
  598.     long oneDay = 24*oneHour;
  599.     long oneWeek = 7*oneDay;
  600.     long fourWeeks = 4*oneWeek;
  601.     
  602.     int inOneHour = 0;
  603.     int inOneDay = 0;
  604.     int inOneWeek = 0;
  605.     int inFourWeeks = 0;
  606.     int inScanPeriod = 0;
  607.     int neverScanned = 0;
  608.     
  609.     int total = blockInfoSet.size();
  610.     
  611.     long now = System.currentTimeMillis();
  612.     
  613.     Date date = new Date();
  614.     
  615.     for(Iterator<BlockScanInfo> it = blockInfoSet.iterator(); it.hasNext();) {
  616.       BlockScanInfo info = it.next();
  617.       
  618.       long scanTime = info.getLastScanTime();
  619.       long diff = now - scanTime;
  620.       
  621.       if (diff <= oneHour) inOneHour++;
  622.       if (diff <= oneDay) inOneDay++;
  623.       if (diff <= oneWeek) inOneWeek++;
  624.       if (diff <= fourWeeks) inFourWeeks++;
  625.       if (diff <= scanPeriod) inScanPeriod++;      
  626.       if (scanTime <= 0) neverScanned++;
  627.       
  628.       if (!summaryOnly) {
  629.         date.setTime(scanTime);
  630.         String scanType = 
  631.           (info.lastScanType == ScanType.REMOTE_READ) ? "remote" : 
  632.             ((info.lastScanType == ScanType.VERIFICATION_SCAN) ? "local" :
  633.               "none");
  634.         buffer.append(String.format("%-26s : status : %-6s type : %-6s" +
  635.                                  " scan time : " +
  636.                                     "%-15d %sn", info.block, 
  637.                                     (info.lastScanOk ? "ok" : "failed"),
  638.                                     scanType, scanTime,
  639.                                     (scanTime <= 0) ? "not yet verified" : 
  640.                                       dateFormat.format(date)));
  641.       }
  642.     }
  643.     
  644.     double pctPeriodLeft = (scanPeriod + currentPeriodStart - now)
  645.                            *100.0/scanPeriod;
  646.     double pctProgress = (totalBytesToScan == 0) ? 100 :
  647.                          (totalBytesToScan-bytesLeft)*10000.0/totalBytesToScan/
  648.                          (100-pctPeriodLeft+1e-10);
  649.     
  650.     buffer.append(String.format("nTotal Blocks                 : %6d" +
  651.                                 "nVerified in last hour        : %6d" +
  652.                                 "nVerified in last day         : %6d" +
  653.                                 "nVerified in last week        : %6d" +
  654.                                 "nVerified in last four weeks  : %6d" +
  655.                                 "nVerified in SCAN_PERIOD      : %6d" +
  656.                                 "nNot yet verified             : %6d" +
  657.                                 "nVerified since restart       : %6d" +
  658.                                 "nScans since restart          : %6d" +
  659.                                 "nScan errors since restart    : %6d" +
  660.                                 "nTransient scan errors        : %6d" +
  661.                                 "nCurrent scan rate limit KBps : %6d" +
  662.                                 "nProgress this period         : %6.0f%%" +
  663.                                 "nTime left in cur period      : %6.2f%%" +
  664.                                 "n", 
  665.                                 total, inOneHour, inOneDay, inOneWeek,
  666.                                 inFourWeeks, inScanPeriod, neverScanned,
  667.                                 totalVerifications, totalScans, 
  668.                                 totalScanErrors, totalTransientErrors, 
  669.                                 Math.round(throttler.getBandwidth()/1024.0),
  670.                                 pctProgress, pctPeriodLeft));
  671.   }
  672.   
  673.   /**
  674.    * This class takes care of log file used to store the last verification
  675.    * times of the blocks. It rolls the current file when it is too big etc.
  676.    * If there is an error while writing, it stops updating with an error
  677.    * message.
  678.    */
  679.   private static class LogFileHandler {
  680.     
  681.     private static final String curFileSuffix = ".curr";
  682.     private static final String prevFileSuffix = ".prev";
  683.     
  684.     // Don't roll files more often than this
  685.     private static final long minRollingPeriod = 6 * 3600 * 1000L; // 6 hours
  686.     private static final long minWarnPeriod = minRollingPeriod;
  687.     private static final int minLineLimit = 1000;
  688.     
  689.     
  690.     static boolean isFilePresent(File dir, String filePrefix) {
  691.       return new File(dir, filePrefix + curFileSuffix).exists() ||
  692.              new File(dir, filePrefix + prevFileSuffix).exists();
  693.     }
  694.     private File curFile;
  695.     private File prevFile;
  696.     
  697.     private int maxNumLines = -1; // not very hard limit on number of lines.
  698.     private int curNumLines = -1;
  699.     
  700.     long lastWarningTime = 0;
  701.     
  702.     private PrintStream out;
  703.     
  704.     int numReaders = 0;
  705.         
  706.     /**
  707.      * Opens the log file for appending.
  708.      * Note that rolling will happen only after "updateLineCount()" is 
  709.      * called. This is so that line count could be updated in a separate
  710.      * thread without delaying start up.
  711.      * 
  712.      * @param dir where the logs files are located.
  713.      * @param filePrefix prefix of the file.
  714.      * @param maxNumLines max lines in a file (its a soft limit).
  715.      * @throws IOException
  716.      */
  717.     LogFileHandler(File dir, String filePrefix, int maxNumLines) 
  718.                                                 throws IOException {
  719.       curFile = new File(dir, filePrefix + curFileSuffix);
  720.       prevFile = new File(dir, filePrefix + prevFileSuffix);
  721.       openCurFile();
  722.       curNumLines = -1;
  723.       setMaxNumLines(maxNumLines);
  724.     }
  725.     
  726.     // setting takes affect when next entry is added.
  727.     synchronized void setMaxNumLines(int maxNumLines) {
  728.       this.maxNumLines = Math.max(maxNumLines, minLineLimit);
  729.     }
  730.     
  731.     /**
  732.      * Append "n" + line.
  733.      * If the log file need to be rolled, it will done after 
  734.      * appending the text.
  735.      * This does not throw IOException when there is an error while 
  736.      * appending. Currently does not throw an error even if rolling 
  737.      * fails (may be it should?).
  738.      * return true if append was successful.
  739.      */
  740.     synchronized boolean appendLine(String line) {
  741.       out.println();
  742.       out.print(line);
  743.       curNumLines += (curNumLines < 0) ? -1 : 1;
  744.       try {
  745.         rollIfRequired();
  746.       } catch (IOException e) {
  747.         warn("Rolling failed for " + curFile + " : " + e.getMessage());
  748.         return false;
  749.       }
  750.       return true;
  751.     }
  752.     
  753.     //warns only once in a while
  754.     synchronized private void warn(String msg) {
  755.       long now = System.currentTimeMillis();
  756.       if ((now - lastWarningTime) >= minWarnPeriod) {
  757.         lastWarningTime = now;
  758.         LOG.warn(msg);
  759.       }
  760.     }
  761.     
  762.     private synchronized void openCurFile() throws FileNotFoundException {
  763.       close();
  764.       out = new PrintStream(new FileOutputStream(curFile, true));
  765.     }
  766.     
  767.     //This reads the current file and updates the count.
  768.     void updateCurNumLines() {
  769.       int count = 0;
  770.       Reader it = null;
  771.       try {
  772.         for(it = new Reader(true); it.hasNext(); count++) {
  773.           it.next();
  774.         }
  775.       } catch (IOException e) {
  776.         
  777.       } finally {
  778.         synchronized (this) {
  779.           curNumLines = count;
  780.         }
  781.         IOUtils.closeStream(it);
  782.       }
  783.     }
  784.     
  785.     private void rollIfRequired() throws IOException {
  786.       if (curNumLines < maxNumLines || numReaders > 0) {
  787.         return;
  788.       }
  789.       
  790.       long now = System.currentTimeMillis();
  791.       if (now < minRollingPeriod) {
  792.         return;
  793.       }
  794.       
  795.       if (!prevFile.delete() && prevFile.exists()) {
  796.         throw new IOException("Could not delete " + prevFile);
  797.       }
  798.       
  799.       close();
  800.       if (!curFile.renameTo(prevFile)) {
  801.         openCurFile();
  802.         throw new IOException("Could not rename " + curFile + 
  803.                               " to " + prevFile);
  804.       }
  805.       
  806.       openCurFile();
  807.       updateCurNumLines();
  808.     }
  809.     
  810.     synchronized void close() {
  811.       if (out != null) {
  812.         out.close();
  813.         out = null;
  814.       }
  815.     }
  816.     
  817.     /**
  818.      * This is used to read the lines in order.
  819.      * If the data is not read completely (i.e, untill hasNext() returns
  820.      * false), it needs to be explicitly 
  821.      */
  822.     private class Reader implements Iterator<String>, Closeable {
  823.       
  824.       BufferedReader reader;
  825.       File file;
  826.       String line;
  827.       boolean closed = false;
  828.       
  829.       private Reader(boolean skipPrevFile) throws IOException {
  830.         synchronized (LogFileHandler.this) {
  831.           numReaders++; 
  832.         }
  833.         reader = null;
  834.         file = (skipPrevFile) ? curFile : prevFile;
  835.         readNext();        
  836.       }
  837.       
  838.       private boolean openFile() throws IOException {
  839.         for(int i=0; i<2; i++) {
  840.           if (reader != null || i > 0) {
  841.             // move to next file
  842.             file = (file == prevFile) ? curFile : null;
  843.           }
  844.           if (file == null) {
  845.             return false;
  846.           }
  847.           if (file.exists()) {
  848.             break;
  849.           }
  850.         }
  851.         
  852.         if (reader != null ) {
  853.           reader.close();
  854.           reader = null;
  855.         }
  856.         
  857.         reader = new BufferedReader(new FileReader(file));
  858.         return true;
  859.       }
  860.       
  861.       // read next line if possible.
  862.       private void readNext() throws IOException {
  863.         line = null;
  864.         try {
  865.           if (reader != null && (line = reader.readLine()) != null) {
  866.             return;
  867.           }
  868.           if (line == null) {
  869.             // move to the next file.
  870.             if (openFile()) {
  871.               readNext();
  872.             }
  873.           }
  874.         } finally {
  875.           if (!hasNext()) {
  876.             close();
  877.           }
  878.         }
  879.       }
  880.       
  881.       public boolean hasNext() {
  882.         return line != null;
  883.       }
  884.       public String next() {
  885.         String curLine = line;
  886.         try {
  887.           readNext();
  888.         } catch (IOException e) {
  889.           LOG.info("Could not reade next line in LogHandler : " +
  890.                    StringUtils.stringifyException(e));
  891.         }
  892.         return curLine;
  893.       }
  894.       public void remove() {
  895.         throw new RuntimeException("remove() is not supported.");
  896.       }
  897.       public void close() throws IOException {
  898.         if (!closed) {
  899.           try {
  900.             if (reader != null) {
  901.               reader.close();
  902.             }
  903.           } finally {
  904.             file = null;
  905.             reader = null;
  906.             closed = true;
  907.             synchronized (LogFileHandler.this) {
  908.               numReaders--;
  909.               assert(numReaders >= 0);
  910.             }
  911.           }
  912.         }
  913.       }
  914.     }    
  915.   }
  916.   
  917.   public static class Servlet extends HttpServlet {
  918.     
  919.     public void doGet(HttpServletRequest request, 
  920.                       HttpServletResponse response) throws IOException {
  921.       
  922.       response.setContentType("text/plain");
  923.       
  924.       DataBlockScanner blockScanner = (DataBlockScanner)  
  925.           getServletContext().getAttribute("datanode.blockScanner");
  926.       
  927.       boolean summary = (request.getParameter("listblocks") == null);
  928.       
  929.       StringBuilder buffer = new StringBuilder(8*1024);
  930.       if (blockScanner == null) {
  931.         buffer.append("Periodic block scanner is not running. " +
  932.                       "Please check the datanode log if this is unexpected.");
  933.       } else if (blockScanner.isInitiliazed()) {
  934.         blockScanner.printBlockReport(buffer, summary);
  935.       } else {
  936.         buffer.append("Periodic block scanner is not yet initialized. " +
  937.                       "Please check back again after some time.");
  938.       }
  939.       response.getWriter().write(buffer.toString()); // extra copy!
  940.     }
  941.   }
  942. }