HDFSMerger.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:4k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.contrib.failmon;
  19. import java.io.BufferedOutputStream;
  20. import java.io.InputStream;
  21. import java.io.BufferedReader;
  22. import java.io.BufferedWriter;
  23. import java.io.File;
  24. import java.io.FileOutputStream;
  25. import java.io.FileReader;
  26. import java.io.FileWriter;
  27. import java.io.IOException;
  28. import java.io.FileNotFoundException;
  29. import java.net.InetAddress;
  30. import java.util.ArrayList;
  31. import java.util.Calendar;
  32. import java.util.zip.CRC32;
  33. import java.util.zip.CheckedOutputStream;
  34. import java.util.zip.ZipEntry;
  35. import java.util.zip.ZipInputStream;
  36. import org.apache.hadoop.conf.Configuration;
  37. import org.apache.hadoop.fs.FileSystem;
  38. import org.apache.hadoop.fs.FileStatus;
  39. import org.apache.hadoop.fs.Path;
  40. import org.apache.hadoop.fs.FSDataOutputStream;
  41. import org.apache.hadoop.fs.FSDataInputStream;
  42. public class HDFSMerger {
  43.   Configuration hadoopConf;
  44.   FileSystem hdfs;
  45.   
  46.   String hdfsDir;
  47.   
  48.   FileStatus [] inputFiles;
  49.   Path outputFilePath;
  50.   FSDataOutputStream outputFile;
  51.     
  52.   boolean compress;
  53.   FileWriter fw;
  54.   BufferedWriter writer;
  55.   public HDFSMerger() throws IOException {
  56.     String hadoopConfPath; 
  57.     if (Environment.getProperty("hadoop.conf.path") == null)
  58.       hadoopConfPath = "../../../conf";
  59.     else
  60.       hadoopConfPath = Environment.getProperty("hadoop.conf.path");
  61.     // Read the configuration for the Hadoop environment
  62.     Configuration hadoopConf = new Configuration();
  63.     hadoopConf.addResource(new Path(hadoopConfPath + "/hadoop-default.xml"));
  64.     hadoopConf.addResource(new Path(hadoopConfPath + "/hadoop-site.xml"));
  65.     
  66.     // determine the local output file name
  67.     if (Environment.getProperty("local.tmp.filename") == null)
  68.       Environment.setProperty("local.tmp.filename", "failmon.dat");
  69.     
  70.     // determine the upload location
  71.     hdfsDir = Environment.getProperty("hdfs.upload.dir");
  72.     if (hdfsDir == null)
  73.       hdfsDir = "/failmon";
  74.     hdfs = FileSystem.get(hadoopConf);
  75.     
  76.     Path hdfsDirPath = new Path(hadoopConf.get("fs.default.name") + hdfsDir);
  77.     try {
  78.       if (!hdfs.getFileStatus(hdfsDirPath).isDir()) {
  79. Environment.logInfo("HDFSMerger: Not an HDFS directory: " + hdfsDirPath.toString());
  80. System.exit(0);
  81.       }
  82.     } catch (FileNotFoundException e) {
  83.       Environment.logInfo("HDFSMerger: Directory not found: " + hdfsDirPath.toString());
  84.     }
  85.     inputFiles = hdfs.listStatus(hdfsDirPath);
  86.     outputFilePath = new Path(hdfsDirPath.toString() + "/" + "merge-"
  87.   + Calendar.getInstance().getTimeInMillis() + ".dat");
  88.     outputFile = hdfs.create(outputFilePath);
  89.     
  90.     for (FileStatus fstatus : inputFiles) {
  91.       appendFile(fstatus.getPath());
  92.       hdfs.delete(fstatus.getPath());
  93.     }
  94.     outputFile.close();
  95.     Environment.logInfo("HDFS file merging complete!");
  96.   }
  97.   private void appendFile (Path inputPath) throws IOException {
  98.     
  99.     FSDataInputStream anyInputFile = hdfs.open(inputPath);
  100.     InputStream inputFile;
  101.     byte buffer[] = new byte[4096];
  102.     
  103.     if (inputPath.toString().endsWith(LocalStore.COMPRESSION_SUFFIX)) {
  104.       // the file is compressed
  105.       inputFile = new ZipInputStream(anyInputFile);
  106.       ((ZipInputStream) inputFile).getNextEntry();
  107.     } else {
  108.       inputFile = anyInputFile;
  109.     }
  110.     
  111.     try {
  112.       int bytesRead = 0;
  113.       while ((bytesRead = inputFile.read(buffer)) > 0) {
  114. outputFile.write(buffer, 0, bytesRead);
  115.       }
  116.     } catch (IOException e) {
  117.       Environment.logInfo("Error while copying file:" + inputPath.toString());
  118.     } finally {
  119.       inputFile.close();
  120.     }    
  121.   }
  122.   
  123.   public static void main(String [] args) {
  124.     Environment.prepare("./conf/failmon.properties");
  125.     try {
  126.       new HDFSMerger();
  127.     } catch (IOException e) {
  128.       e.printStackTrace();
  129.       }
  130.   }
  131. }