LocalStore.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:8k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.contrib.failmon;
  19. import java.io.BufferedOutputStream;
  20. import java.io.BufferedReader;
  21. import java.io.BufferedWriter;
  22. import java.io.File;
  23. import java.io.FileOutputStream;
  24. import java.io.FileReader;
  25. import java.io.FileWriter;
  26. import java.io.IOException;
  27. import java.net.InetAddress;
  28. import java.util.ArrayList;
  29. import java.util.Calendar;
  30. import java.util.zip.CRC32;
  31. import java.util.zip.CheckedOutputStream;
  32. import java.util.zip.ZipEntry;
  33. import java.util.zip.ZipOutputStream;
  34. import org.apache.hadoop.conf.Configuration;
  35. import org.apache.hadoop.fs.FileSystem;
  36. import org.apache.hadoop.fs.Path;
  37. /**********************************************************
  38.  * This class takes care of the temporary local storage of 
  39.  * gathered metrics before they get uploaded into HDFS. It writes 
  40.  * Serialized Records as lines in a temporary file and then 
  41.  * compresses and uploads it into HDFS.
  42.  * 
  43.  **********************************************************/
  44. public class LocalStore {
  45.   public final static char FIELD_SEPARATOR = '|';
  46.   public final static char RECORD_SEPARATOR = 'n';
  47.   public final static String COMPRESSION_SUFFIX = ".zip";
  48.   public final static int UPLOAD_INTERVAL = 600;
  49.   String filename;
  50.   String hdfsDir;
  51.   boolean compress;
  52.   FileWriter fw;
  53.   BufferedWriter writer;
  54.   /**
  55.    * Create an instance of the class and read the configuration
  56.    * file to determine some output parameters. Then, initiate the 
  57.    * structured needed for the buffered I/O (so that smal appends
  58.    * can be handled efficiently).
  59.    * 
  60.    */ 
  61.   public LocalStore() {
  62.     // determine the local output file name
  63.     if (Environment.getProperty("local.tmp.filename") == null)
  64.       Environment.setProperty("local.tmp.filename", "failmon.dat");
  65.     
  66.     // local.tmp.dir has been set by the Executor
  67.     if (Environment.getProperty("local.tmp.dir") == null)
  68.       Environment.setProperty("local.tmp.dir", System.getProperty("java.io.tmpdir"));
  69.     
  70.     filename = Environment.getProperty("local.tmp.dir") + "/" +
  71.       Environment.getProperty("local.tmp.filename");
  72.     // determine the upload location
  73.     hdfsDir = Environment.getProperty("hdfs.upload.dir");
  74.     if (hdfsDir == null)
  75.       hdfsDir = "/failmon";
  76.     // determine if compression is enabled
  77.     compress = true;
  78.     if ("false".equalsIgnoreCase(Environment
  79.         .getProperty("local.tmp.compression")))
  80.       compress = false;
  81.     try {
  82.       fw = new FileWriter(filename, true);
  83.       writer = new BufferedWriter(fw);
  84.     } catch (IOException e) {
  85.       e.printStackTrace();
  86.     }
  87.   }
  88.   /**
  89.    * Insert an EventRecord to the local storage, after it
  90.    * gets serialized and anonymized.
  91.    * 
  92.    * @param er the EventRecord to be inserted
  93.    */ 
  94.   
  95.   public void insert(EventRecord er) {
  96.     SerializedRecord sr = new SerializedRecord(er);
  97.     try {
  98.       Anonymizer.anonymize(sr);
  99.     } catch (Exception e) {
  100.       e.printStackTrace();
  101.     }
  102.     append(sr);
  103.   }
  104.   /**
  105.    * Insert an array of EventRecords to the local storage, after they
  106.    * get serialized and anonymized.
  107.    * 
  108.    * @param ers the array of EventRecords to be inserted
  109.    */
  110.   public void insert(EventRecord[] ers) {
  111.     for (EventRecord er : ers)
  112.       insert(er);
  113.   }
  114.   private void append(SerializedRecord sr) {
  115.     try {
  116.       writer.write(pack(sr).toString());
  117.       writer.write(RECORD_SEPARATOR);
  118.       // writer.flush();
  119.     } catch (IOException e) {
  120.       e.printStackTrace();
  121.     }
  122.   }
  123.   /**
  124.    * Pack a SerializedRecord into an array of bytes
  125.    * 
  126.    * @param sr the SerializedRecord to be packed
  127.    */
  128.   public static StringBuffer pack(SerializedRecord sr) {
  129.     StringBuffer sb = new StringBuffer();
  130.     ArrayList<String> keys = new ArrayList<String>(sr.fields.keySet());
  131.     if (sr.isValid())
  132.       SerializedRecord.arrangeKeys(keys);
  133.     for (int i = 0; i < keys.size(); i++) {
  134.       String value = sr.fields.get(keys.get(i));
  135.       sb.append(keys.get(i) + ":" + value);
  136.       sb.append(FIELD_SEPARATOR);
  137.     }
  138.     return sb;
  139.   }
  140.   /**
  141.    * Upload the local file store into HDFS, after it 
  142.    * compressing it. Then a new local file is created 
  143.    * as a temporary record store.
  144.    * 
  145.    */
  146.   public void upload() {
  147.     try {
  148.       writer.flush();
  149.       if (compress)
  150.         zipCompress(filename);
  151.       String remoteName = "failmon-";
  152.       if ("true".equalsIgnoreCase(Environment.getProperty("anonymizer.hash.hostnames")))
  153.         remoteName += Anonymizer.getMD5Hash(InetAddress.getLocalHost().getCanonicalHostName()) + "-";
  154.       else
  155.         remoteName += InetAddress.getLocalHost().getCanonicalHostName() + "-"; 
  156.       remoteName += Calendar.getInstance().getTimeInMillis();//.toString();
  157.       if (compress)
  158. copyToHDFS(filename + COMPRESSION_SUFFIX, hdfsDir + "/" + remoteName + COMPRESSION_SUFFIX);
  159.       else
  160. copyToHDFS(filename, hdfsDir + "/" + remoteName);
  161.     } catch (IOException e) {
  162.       e.printStackTrace();
  163.     }
  164.     // delete and re-open
  165.     try {
  166.       fw.close();
  167.       fw = new FileWriter(filename);
  168.       writer = new BufferedWriter(fw);
  169.     } catch (IOException e) {
  170.       e.printStackTrace();
  171.     }
  172.   }
  173.   
  174.   /**
  175.    * Compress a text file using the ZIP compressing algorithm.
  176.    * 
  177.    * @param filename the path to the file to be compressed
  178.    */
  179.   public static void zipCompress(String filename) throws IOException {
  180.     FileOutputStream fos = new FileOutputStream(filename + COMPRESSION_SUFFIX);
  181.     CheckedOutputStream csum = new CheckedOutputStream(fos, new CRC32());
  182.     ZipOutputStream out = new ZipOutputStream(new BufferedOutputStream(csum));
  183.     out.setComment("Failmon records.");
  184.     BufferedReader in = new BufferedReader(new FileReader(filename));
  185.     out.putNextEntry(new ZipEntry(new File(filename).getName()));
  186.     int c;
  187.     while ((c = in.read()) != -1)
  188.       out.write(c);
  189.     in.close();
  190.     out.finish();
  191.     out.close();
  192.   }
  193.   /**
  194.    * Copy a local file to HDFS
  195.    * 
  196.    * @param localFile the filename of the local file
  197.    * @param hdfsFile the HDFS filename to copy to
  198.    */
  199.   public static void copyToHDFS(String localFile, String hdfsFile) throws IOException {
  200.     String hadoopConfPath; 
  201.     if (Environment.getProperty("hadoop.conf.path") == null)
  202.       hadoopConfPath = "../../../conf";
  203.     else
  204.       hadoopConfPath = Environment.getProperty("hadoop.conf.path");
  205.     // Read the configuration for the Hadoop environment
  206.     Configuration hadoopConf = new Configuration();
  207.     hadoopConf.addResource(new Path(hadoopConfPath + "/hadoop-default.xml"));
  208.     hadoopConf.addResource(new Path(hadoopConfPath + "/hadoop-site.xml"));
  209.     // System.out.println(hadoopConf.get("hadoop.tmp.dir"));
  210.     // System.out.println(hadoopConf.get("fs.default.name"));
  211.     FileSystem fs = FileSystem.get(hadoopConf);
  212.     // HadoopDFS deals with Path
  213.     Path inFile = new Path("file://" + localFile);
  214.     Path outFile = new Path(hadoopConf.get("fs.default.name") + hdfsFile);
  215.      // Read from and write to new file
  216.     Environment.logInfo("Uploading to HDFS (file " + outFile + ") ...");
  217.     fs.copyFromLocalFile(false, inFile, outFile);
  218.   }
  219.   /**
  220.    * Close the temporary local file
  221.    * 
  222.    */ 
  223.   public void close() {
  224.     try {
  225.     writer.flush();
  226.     writer.close();
  227.     } catch (IOException e) {
  228.       e.printStackTrace();
  229.     }
  230.   }
  231. }