InMemoryNativeFileSystemStore.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:6k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.fs.s3native;
  19. import static org.apache.hadoop.fs.s3native.NativeS3FileSystem.PATH_DELIMITER;
  20. import java.io.BufferedInputStream;
  21. import java.io.BufferedOutputStream;
  22. import java.io.ByteArrayOutputStream;
  23. import java.io.File;
  24. import java.io.FileInputStream;
  25. import java.io.FileOutputStream;
  26. import java.io.IOException;
  27. import java.io.InputStream;
  28. import java.net.URI;
  29. import java.util.ArrayList;
  30. import java.util.Iterator;
  31. import java.util.List;
  32. import java.util.SortedMap;
  33. import java.util.SortedSet;
  34. import java.util.TreeMap;
  35. import java.util.TreeSet;
  36. import java.util.Map.Entry;
  37. import org.apache.hadoop.conf.Configuration;
  38. /**
  39.  * <p>
  40.  * A stub implementation of {@link NativeFileSystemStore} for testing
  41.  * {@link NativeS3FileSystem} without actually connecting to S3.
  42.  * </p>
  43.  */
  44. class InMemoryNativeFileSystemStore implements NativeFileSystemStore {
  45.   
  46.   private Configuration conf;
  47.   
  48.   private SortedMap<String, FileMetadata> metadataMap =
  49.     new TreeMap<String, FileMetadata>();
  50.   private SortedMap<String, byte[]> dataMap = new TreeMap<String, byte[]>();
  51.   public void initialize(URI uri, Configuration conf) throws IOException {
  52.     this.conf = conf;
  53.   }
  54.   public void storeEmptyFile(String key) throws IOException {
  55.     metadataMap.put(key, new FileMetadata(key, 0, System.currentTimeMillis()));
  56.     dataMap.put(key, new byte[0]);
  57.   }
  58.   public void storeFile(String key, File file, byte[] md5Hash)
  59.     throws IOException {
  60.     
  61.     ByteArrayOutputStream out = new ByteArrayOutputStream();
  62.     byte[] buf = new byte[8192];
  63.     int numRead;
  64.     BufferedInputStream in = null;
  65.     try {
  66.       in = new BufferedInputStream(new FileInputStream(file));
  67.       while ((numRead = in.read(buf)) >= 0) {
  68.         out.write(buf, 0, numRead);
  69.       }
  70.     } finally {
  71.       if (in != null) {
  72.         in.close();
  73.       }
  74.     }
  75.     metadataMap.put(key,
  76.         new FileMetadata(key, file.length(), System.currentTimeMillis()));
  77.     dataMap.put(key, out.toByteArray());
  78.   }
  79.   public InputStream retrieve(String key) throws IOException {
  80.     return retrieve(key, 0);
  81.   }
  82.   
  83.   public InputStream retrieve(String key, long byteRangeStart)
  84.     throws IOException {
  85.     
  86.     byte[] data = dataMap.get(key);
  87.     File file = createTempFile();
  88.     BufferedOutputStream out = null;
  89.     try {
  90.       out = new BufferedOutputStream(new FileOutputStream(file));
  91.       out.write(data, (int) byteRangeStart,
  92.           data.length - (int) byteRangeStart);
  93.     } finally {
  94.       if (out != null) {
  95.         out.close();
  96.       }
  97.     }
  98.     return new FileInputStream(file);
  99.   }
  100.   
  101.   private File createTempFile() throws IOException {
  102.     File dir = new File(conf.get("fs.s3.buffer.dir"));
  103.     if (!dir.exists() && !dir.mkdirs()) {
  104.       throw new IOException("Cannot create S3 buffer directory: " + dir);
  105.     }
  106.     File result = File.createTempFile("test-", ".tmp", dir);
  107.     result.deleteOnExit();
  108.     return result;
  109.   }
  110.   public FileMetadata retrieveMetadata(String key) throws IOException {
  111.     return metadataMap.get(key);
  112.   }
  113.   public PartialListing list(String prefix, int maxListingLength)
  114.       throws IOException {
  115.     return list(prefix, maxListingLength, null);
  116.   }
  117.   public PartialListing list(String prefix, int maxListingLength,
  118.       String priorLastKey) throws IOException {
  119.     return list(prefix, PATH_DELIMITER, maxListingLength, priorLastKey);
  120.   }
  121.   public PartialListing listAll(String prefix, int maxListingLength,
  122.       String priorLastKey) throws IOException {
  123.     return list(prefix, null, maxListingLength, priorLastKey);
  124.   }
  125.   private PartialListing list(String prefix, String delimiter,
  126.       int maxListingLength, String priorLastKey) throws IOException {
  127.     if (prefix.length() > 0 && !prefix.endsWith(PATH_DELIMITER)) {
  128.       prefix += PATH_DELIMITER;
  129.     }
  130.     
  131.     List<FileMetadata> metadata = new ArrayList<FileMetadata>();
  132.     SortedSet<String> commonPrefixes = new TreeSet<String>();
  133.     for (String key : dataMap.keySet()) {
  134.       if (key.startsWith(prefix)) {
  135.         if (delimiter == null) {
  136.           metadata.add(retrieveMetadata(key));
  137.         } else {
  138.           int delimIndex = key.indexOf(delimiter, prefix.length());
  139.           if (delimIndex == -1) {
  140.             metadata.add(retrieveMetadata(key));
  141.           } else {
  142.             String commonPrefix = key.substring(0, delimIndex);
  143.             commonPrefixes.add(commonPrefix);
  144.           }
  145.         }
  146.       }
  147.       if (metadata.size() + commonPrefixes.size() == maxListingLength) {
  148.         new PartialListing(key, metadata.toArray(new FileMetadata[0]),
  149.             commonPrefixes.toArray(new String[0]));
  150.       }
  151.     }
  152.     return new PartialListing(null, metadata.toArray(new FileMetadata[0]),
  153.         commonPrefixes.toArray(new String[0]));
  154.   }
  155.   public void delete(String key) throws IOException {
  156.     metadataMap.remove(key);
  157.     dataMap.remove(key);
  158.   }
  159.   public void rename(String srcKey, String dstKey) throws IOException {
  160.     metadataMap.put(dstKey, metadataMap.remove(srcKey));
  161.     dataMap.put(dstKey, dataMap.remove(srcKey));
  162.   }
  163.   
  164.   public void purge(String prefix) throws IOException {
  165.     Iterator<Entry<String, FileMetadata>> i =
  166.       metadataMap.entrySet().iterator();
  167.     while (i.hasNext()) {
  168.       Entry<String, FileMetadata> entry = i.next();
  169.       if (entry.getKey().startsWith(prefix)) {
  170.         dataMap.remove(entry.getKey());
  171.         i.remove();
  172.       }
  173.     }
  174.   }
  175.   public void dump() throws IOException {
  176.     System.out.println(metadataMap.values());
  177.     System.out.println(dataMap.keySet());
  178.   }
  179. }