HarFileSystem.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:27k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.fs;
  19. import java.io.FileNotFoundException;
  20. import java.io.IOException;
  21. import java.net.URI;
  22. import java.net.URISyntaxException;
  23. import java.util.ArrayList;
  24. import java.util.List;
  25. import org.apache.hadoop.conf.Configuration;
  26. import org.apache.hadoop.fs.permission.FsPermission;
  27. import org.apache.hadoop.io.Text;
  28. import org.apache.hadoop.util.LineReader;
  29. import org.apache.hadoop.util.Progressable;
  30. /**
  31.  * This is an implementation of the Hadoop Archive 
  32.  * Filesystem. This archive Filesystem has index files
  33.  * of the form _index* and has contents of the form
  34.  * part-*. The index files store the indexes of the 
  35.  * real files. The index files are of the form _masterindex
  36.  * and _index. The master index is a level of indirection 
  37.  * in to the index file to make the look ups faster. the index
  38.  * file is sorted with hash code of the paths that it contains 
  39.  * and the master index contains pointers to the positions in 
  40.  * index for ranges of hashcodes.
  41.  */
  42. public class HarFileSystem extends FilterFileSystem {
  43.   public static final int VERSION = 1;
  44.   // uri representation of this Har filesystem
  45.   private URI uri;
  46.   // the version of this har filesystem
  47.   private int version;
  48.   // underlying uri 
  49.   private URI underLyingURI;
  50.   // the top level path of the archive
  51.   // in the underlying file system
  52.   private Path archivePath;
  53.   // the masterIndex of the archive
  54.   private Path masterIndex;
  55.   // the index file 
  56.   private Path archiveIndex;
  57.   // the har auth
  58.   private String harAuth;
  59.   
  60.   /**
  61.    * public construction of harfilesystem
  62.    *
  63.    */
  64.   public HarFileSystem() {
  65.   }
  66.   
  67.   /**
  68.    * Constructor to create a HarFileSystem with an
  69.    * underlying filesystem.
  70.    * @param fs
  71.    */
  72.   public HarFileSystem(FileSystem fs) {
  73.     super(fs);
  74.   }
  75.   
  76.   /**
  77.    * Initialize a Har filesystem per har archive. The 
  78.    * archive home directory is the top level directory
  79.    * in the filesystem that contains the HAR archive.
  80.    * Be careful with this method, you do not want to go 
  81.    * on creating new Filesystem instances per call to 
  82.    * path.getFileSystem().
  83.    * the uri of Har is 
  84.    * har://underlyingfsscheme-host:port/archivepath.
  85.    * or 
  86.    * har:///archivepath. This assumes the underlying filesystem
  87.    * to be used in case not specified.
  88.    */
  89.   public void initialize(URI name, Configuration conf) throws IOException {
  90.     //decode the name
  91.     underLyingURI = decodeHarURI(name, conf);
  92.     //  we got the right har Path- now check if this is 
  93.     //truly a har filesystem
  94.     Path harPath = archivePath(new Path(name.toString()));
  95.     if (harPath == null) { 
  96.       throw new IOException("Invalid path for the Har Filesystem. " + 
  97.                            name.toString());
  98.     }
  99.     if (fs == null) {
  100.       fs = FileSystem.get(underLyingURI, conf);
  101.     }
  102.     this.uri = harPath.toUri();
  103.     this.archivePath = new Path(this.uri.getPath());
  104.     this.harAuth = getHarAuth(this.underLyingURI);
  105.     //check for the underlying fs containing
  106.     // the index file
  107.     this.masterIndex = new Path(archivePath, "_masterindex");
  108.     this.archiveIndex = new Path(archivePath, "_index");
  109.     if (!fs.exists(masterIndex) || !fs.exists(archiveIndex)) {
  110.       throw new IOException("Invalid path for the Har Filesystem. " +
  111.           "No index file in " + harPath);
  112.     }
  113.     try{ 
  114.       this.version = getHarVersion();
  115.     } catch(IOException io) {
  116.       throw new IOException("Unable to " +
  117.           "read the version of the Har file system: " + this.archivePath);
  118.     }
  119.     if (this.version != HarFileSystem.VERSION) {
  120.       throw new IOException("Invalid version " + 
  121.           this.version + " expected " + HarFileSystem.VERSION);
  122.     }
  123.   }
  124.   
  125.   // get the version of the filesystem from the masterindex file
  126.   // the version is currently not useful since its the first version 
  127.   // of archives
  128.   public int getHarVersion() throws IOException { 
  129.     FSDataInputStream masterIn = fs.open(masterIndex);
  130.     LineReader lmaster = new LineReader(masterIn, getConf());
  131.     Text line = new Text();
  132.     lmaster.readLine(line);
  133.     try {
  134.       masterIn.close();
  135.     } catch(IOException e){
  136.       //disregard it.
  137.       // its a read.
  138.     }
  139.     String versionLine = line.toString();
  140.     String[] arr = versionLine.split(" ");
  141.     int version = Integer.parseInt(arr[0]);
  142.     return version;
  143.   }
  144.   
  145.   /*
  146.    * find the parent path that is the 
  147.    * archive path in the path. The last
  148.    * path segment that ends with .har is 
  149.    * the path that will be returned.
  150.    */
  151.   private Path archivePath(Path p) {
  152.     Path retPath = null;
  153.     Path tmp = p;
  154.     for (int i=0; i< p.depth(); i++) {
  155.       if (tmp.toString().endsWith(".har")) {
  156.         retPath = tmp;
  157.         break;
  158.       }
  159.       tmp = tmp.getParent();
  160.     }
  161.     return retPath;
  162.   }
  163.   /**
  164.    * decode the raw URI to get the underlying URI
  165.    * @param rawURI raw Har URI
  166.    * @return filtered URI of the underlying fileSystem
  167.    */
  168.   private URI decodeHarURI(URI rawURI, Configuration conf) throws IOException {
  169.     String tmpAuth = rawURI.getAuthority();
  170.     //we are using the default file
  171.     //system in the config 
  172.     //so create a underlying uri and 
  173.     //return it
  174.     if (tmpAuth == null) {
  175.       //create a path 
  176.       return FileSystem.getDefaultUri(conf);
  177.     }
  178.     String host = rawURI.getHost();
  179.     String[] str = host.split("-", 2);
  180.     if (str[0] == null) {
  181.       throw new IOException("URI: " + rawURI + " is an invalid Har URI.");
  182.     }
  183.     String underLyingScheme = str[0];
  184.     String underLyingHost = (str.length > 1)? str[1]:null;
  185.     int underLyingPort = rawURI.getPort();
  186.     String auth = (underLyingHost == null && underLyingPort == -1)?
  187.                   null:(underLyingHost+":"+underLyingPort);
  188.     URI tmp = null;
  189.     if (rawURI.getQuery() != null) {
  190.       // query component not allowed
  191.       throw new IOException("query component in Path not supported  " + rawURI);
  192.     }
  193.     try {
  194.       tmp = new URI(underLyingScheme, auth, rawURI.getPath(), 
  195.             rawURI.getQuery(), rawURI.getFragment());
  196.     } catch (URISyntaxException e) {
  197.         // do nothing should not happen
  198.     }
  199.     return tmp;
  200.   }
  201.   
  202.   /**
  203.    * return the top level archive.
  204.    */
  205.   public Path getWorkingDirectory() {
  206.     return new Path(uri.toString());
  207.   }
  208.   
  209.   /**
  210.    * Create a har specific auth 
  211.    * har-underlyingfs:port
  212.    * @param underLyingURI the uri of underlying
  213.    * filesystem
  214.    * @return har specific auth
  215.    */
  216.   private String getHarAuth(URI underLyingUri) {
  217.     String auth = underLyingUri.getScheme() + "-";
  218.     if (underLyingUri.getHost() != null) {
  219.       auth += underLyingUri.getHost() + ":";
  220.       if (underLyingUri.getPort() != -1) {
  221.         auth +=  underLyingUri.getPort();
  222.       }
  223.     }
  224.     else {
  225.       auth += ":";
  226.     }
  227.     return auth;
  228.   }
  229.   
  230.   /**
  231.    * Returns the uri of this filesystem.
  232.    * The uri is of the form 
  233.    * har://underlyingfsschema-host:port/pathintheunderlyingfs
  234.    */
  235.   @Override
  236.   public URI getUri() {
  237.     return this.uri;
  238.   }
  239.   
  240.   /**
  241.    * this method returns the path 
  242.    * inside the har filesystem.
  243.    * this is relative path inside 
  244.    * the har filesystem.
  245.    * @param path the fully qualified path in the har filesystem.
  246.    * @return relative path in the filesystem.
  247.    */
  248.   private Path getPathInHar(Path path) {
  249.     Path harPath = new Path(path.toUri().getPath());
  250.     if (archivePath.compareTo(harPath) == 0)
  251.       return new Path(Path.SEPARATOR);
  252.     Path tmp = new Path(harPath.getName());
  253.     Path parent = harPath.getParent();
  254.     while (!(parent.compareTo(archivePath) == 0)) {
  255.       if (parent.toString().equals(Path.SEPARATOR)) {
  256.         tmp = null;
  257.         break;
  258.       }
  259.       tmp = new Path(parent.getName(), tmp);
  260.       parent = parent.getParent();
  261.     }
  262.     if (tmp != null) 
  263.       tmp = new Path(Path.SEPARATOR, tmp);
  264.     return tmp;
  265.   }
  266.   
  267.   //the relative path of p. basically 
  268.   // getting rid of /. Parsing and doing 
  269.   // string manipulation is not good - so
  270.   // just use the path api to do it.
  271.   private Path makeRelative(String initial, Path p) {
  272.     Path root = new Path(Path.SEPARATOR);
  273.     if (root.compareTo(p) == 0)
  274.       return new Path(initial);
  275.     Path retPath = new Path(p.getName());
  276.     Path parent = p.getParent();
  277.     for (int i=0; i < p.depth()-1; i++) {
  278.       retPath = new Path(parent.getName(), retPath);
  279.       parent = parent.getParent();
  280.     }
  281.     return new Path(initial, retPath.toString());
  282.   }
  283.   
  284.   /* this makes a path qualified in the har filesystem
  285.    * (non-Javadoc)
  286.    * @see org.apache.hadoop.fs.FilterFileSystem#makeQualified(
  287.    * org.apache.hadoop.fs.Path)
  288.    */
  289.   @Override
  290.   public Path makeQualified(Path path) {
  291.     // make sure that we just get the 
  292.     // path component 
  293.     Path fsPath = path;
  294.     if (!path.isAbsolute()) {
  295.       fsPath = new Path(archivePath, path);
  296.     }
  297.     URI tmpURI = fsPath.toUri();
  298.     fsPath = new Path(tmpURI.getPath());
  299.     //change this to Har uri 
  300.     URI tmp = null;
  301.     try {
  302.       tmp = new URI(uri.getScheme(), harAuth, fsPath.toString(),
  303.                     tmpURI.getQuery(), tmpURI.getFragment());
  304.     } catch(URISyntaxException ue) {
  305.       LOG.error("Error in URI ", ue);
  306.     }
  307.     if (tmp != null) {
  308.       return new Path(tmp.toString());
  309.     }
  310.     return null;
  311.   }
  312.   
  313.   /**
  314.    * get block locations from the underlying fs
  315.    * @param file the input filestatus to get block locations
  316.    * @param start the start in the file
  317.    * @param len the length in the file
  318.    * @return block locations for this segment of file
  319.    * @throws IOException
  320.    */
  321.   @Override
  322.   public BlockLocation[] getFileBlockLocations(FileStatus file, long start,
  323.       long len) throws IOException {
  324.     // need to look up the file in the underlying fs
  325.     // look up the index 
  326.     
  327.     // make sure this is a prt of this har filesystem
  328.     Path p = makeQualified(file.getPath());
  329.     Path harPath = getPathInHar(p);
  330.     String line = fileStatusInIndex(harPath);
  331.     if (line == null)  {
  332.       throw new FileNotFoundException("File " + file.getPath() + " not found");
  333.     }
  334.     HarStatus harStatus = new HarStatus(line);
  335.     if (harStatus.isDir()) {
  336.       return new BlockLocation[0];
  337.     }
  338.     FileStatus fsFile = fs.getFileStatus(new Path(archivePath,
  339.         harStatus.getPartName()));
  340.     BlockLocation[] rawBlocks = fs.getFileBlockLocations(fsFile, 
  341.         harStatus.getStartIndex() + start, len);
  342.     return fakeBlockLocations(rawBlocks, harStatus.getStartIndex());
  343.   }
  344.   
  345.   /**
  346.    * fake the rawblocks since map reduce uses the block offsets to 
  347.    * fo some computations regarding the blocks
  348.    * @param rawBlocks the raw blocks returned by the filesystem
  349.    * @return faked blocks with changed offsets.
  350.    */
  351.   private BlockLocation[] fakeBlockLocations(BlockLocation[] rawBlocks, 
  352.   long startIndex) {
  353. for (BlockLocation block : rawBlocks) {
  354. long rawOffset = block.getOffset();
  355. block.setOffset(rawOffset - startIndex);
  356. }
  357. return rawBlocks;
  358.   }
  359.   
  360.   /**
  361.    * the hash of the path p inside iniside
  362.    * the filesystem
  363.    * @param p the path in the harfilesystem
  364.    * @return the hash code of the path.
  365.    */
  366.   public static int getHarHash(Path p) {
  367.     return (p.toString().hashCode() & 0x7fffffff);
  368.   }
  369.   
  370.   static class Store {
  371.     public Store() {
  372.       begin = end = startHash = endHash = 0;
  373.     }
  374.     public Store(long begin, long end, int startHash, int endHash) {
  375.       this.begin = begin;
  376.       this.end = end;
  377.       this.startHash = startHash;
  378.       this.endHash = endHash;
  379.     }
  380.     public long begin;
  381.     public long end;
  382.     public int startHash;
  383.     public int endHash;
  384.   }
  385.   
  386.   // make sure that this harPath is relative to the har filesystem
  387.   // this only works for relative paths. This returns the line matching
  388.   // the file in the index. Returns a null if there is not matching 
  389.   // filename in the index file.
  390.   private String fileStatusInIndex(Path harPath) throws IOException {
  391.     // read the index file 
  392.     int hashCode = getHarHash(harPath);
  393.     // get the master index to find the pos 
  394.     // in the index file
  395.     FSDataInputStream in = fs.open(masterIndex);
  396.     FileStatus masterStat = fs.getFileStatus(masterIndex);
  397.     LineReader lin = new LineReader(in, getConf());
  398.     Text line = new Text();
  399.     long read = lin.readLine(line);
  400.    //ignore the first line. this is the header of the index files
  401.     String[] readStr = null;
  402.     List<Store> stores = new ArrayList<Store>();
  403.     while(read < masterStat.getLen()) {
  404.       int b = lin.readLine(line);
  405.       read += b;
  406.       readStr = line.toString().split(" ");
  407.       int startHash = Integer.parseInt(readStr[0]);
  408.       int endHash  = Integer.parseInt(readStr[1]);
  409.       if (startHash <= hashCode && hashCode <= endHash) {
  410.         stores.add(new Store(Long.parseLong(readStr[2]), 
  411.             Long.parseLong(readStr[3]), startHash,
  412.             endHash));
  413.       }
  414.       line.clear();
  415.     }
  416.     try {
  417.       lin.close();
  418.     } catch(IOException io){
  419.       // do nothing just a read.
  420.     }
  421.     FSDataInputStream aIn = fs.open(archiveIndex);
  422.     LineReader aLin = new LineReader(aIn, getConf());
  423.     String retStr = null;
  424.     // now start reading the real index file
  425.      read = 0;
  426.     for (Store s: stores) {
  427.       aIn.seek(s.begin);
  428.       while (read + s.begin < s.end) {
  429.         int tmp = aLin.readLine(line);
  430.         read += tmp;
  431.         String lineFeed = line.toString();
  432.         String[] parsed = lineFeed.split(" ");
  433.         if (harPath.compareTo(new Path(parsed[0])) == 0) {
  434.           // bingo!
  435.           retStr = lineFeed;
  436.           break;
  437.         }
  438.         line.clear();
  439.       }
  440.       if (retStr != null)
  441.         break;
  442.     }
  443.     try {
  444.       aIn.close();
  445.     } catch(IOException io) {
  446.       //do nothing
  447.     }
  448.     return retStr;
  449.   }
  450.   
  451.   // a single line parser for hadoop archives status 
  452.   // stored in a single line in the index files 
  453.   // the format is of the form 
  454.   // filename "dir"/"file" partFileName startIndex length 
  455.   // <space seperated children>
  456.   private static class HarStatus {
  457.     boolean isDir;
  458.     String name;
  459.     List<String> children;
  460.     String partName;
  461.     long startIndex;
  462.     long length;
  463.     public HarStatus(String harString) {
  464.       String[] splits = harString.split(" ");
  465.       this.name = splits[0];
  466.       this.isDir = "dir".equals(splits[1]) ? true: false;
  467.       // this is equal to "none" if its a directory
  468.       this.partName = splits[2];
  469.       this.startIndex = Long.parseLong(splits[3]);
  470.       this.length = Long.parseLong(splits[4]);
  471.       if (isDir) {
  472.         children = new ArrayList<String>();
  473.         for (int i = 5; i < splits.length; i++) {
  474.           children.add(splits[i]);
  475.         }
  476.       }
  477.     }
  478.     public boolean isDir() {
  479.       return isDir;
  480.     }
  481.     
  482.     public String getName() {
  483.       return name;
  484.     }
  485.     
  486.     public List<String> getChildren() {
  487.       return children;
  488.     }
  489.     public String getFileName() {
  490.       return name;
  491.     }
  492.     public String getPartName() {
  493.       return partName;
  494.     }
  495.     public long getStartIndex() {
  496.       return startIndex;
  497.     }
  498.     public long getLength() {
  499.       return length;
  500.     }
  501.   }
  502.   
  503.   /**
  504.    * return the filestatus of files in har archive.
  505.    * The permission returned are that of the archive
  506.    * index files. The permissions are not persisted 
  507.    * while creating a hadoop archive.
  508.    * @param f the path in har filesystem
  509.    * @return filestatus.
  510.    * @throws IOException
  511.    */
  512.   @Override
  513.   public FileStatus getFileStatus(Path f) throws IOException {
  514.     FileStatus archiveStatus = fs.getFileStatus(archiveIndex);
  515.     // get the fs DataInputStream for the underlying file
  516.     // look up the index.
  517.     Path p = makeQualified(f);
  518.     Path harPath = getPathInHar(p);
  519.     if (harPath == null) {
  520.       throw new IOException("Invalid file name: " + f + " in " + uri);
  521.     }
  522.     String readStr = fileStatusInIndex(harPath);
  523.     if (readStr == null) {
  524.       throw new FileNotFoundException("File: " +  f + " does not exist in " + uri);
  525.     }
  526.     HarStatus hstatus = null;
  527.     hstatus = new HarStatus(readStr);
  528.     return new FileStatus(hstatus.isDir()?0:hstatus.getLength(), hstatus.isDir(),
  529.         (int)archiveStatus.getReplication(), archiveStatus.getBlockSize(),
  530.         archiveStatus.getModificationTime(), archiveStatus.getAccessTime(),
  531.         new FsPermission(
  532.         archiveStatus.getPermission()), archiveStatus.getOwner(), 
  533.         archiveStatus.getGroup(), 
  534.             makeRelative(this.uri.toString(), new Path(hstatus.name)));
  535.   }
  536.   /**
  537.    * Returns a har input stream which fakes end of 
  538.    * file. It reads the index files to get the part 
  539.    * file name and the size and start of the file.
  540.    */
  541.   @Override
  542.   public FSDataInputStream open(Path f, int bufferSize) throws IOException {
  543.     // get the fs DataInputStream for the underlying file
  544.     // look up the index.
  545.     Path p = makeQualified(f);
  546.     Path harPath = getPathInHar(p);
  547.     if (harPath == null) {
  548.       throw new IOException("Invalid file name: " + f + " in " + uri);
  549.     }
  550.     String readStr = fileStatusInIndex(harPath);
  551.     if (readStr == null) {
  552.       throw new FileNotFoundException(f + ": not found in " + archivePath);
  553.     }
  554.     HarStatus hstatus = new HarStatus(readStr); 
  555.     // we got it.. woo hooo!!! 
  556.     if (hstatus.isDir()) {
  557.       throw new FileNotFoundException(f + " : not a file in " +
  558.                 archivePath);
  559.     }
  560.     return new HarFSDataInputStream(fs, new Path(archivePath, 
  561.         hstatus.getPartName()),
  562.         hstatus.getStartIndex(), hstatus.getLength(), bufferSize);
  563.   }
  564.  
  565.   /*
  566.    * create throws an exception in Har filesystem.
  567.    * The archive once created cannot be changed.
  568.    */
  569.   public FSDataOutputStream create(Path f, int bufferSize) 
  570.                                     throws IOException {
  571.     throw new IOException("Har: Create not allowed");
  572.   }
  573.   
  574.   public FSDataOutputStream create(Path f,
  575.       FsPermission permission,
  576.       boolean overwrite,
  577.       int bufferSize,
  578.       short replication,
  579.       long blockSize,
  580.       Progressable progress) throws IOException {
  581.     throw new IOException("Har: create not allowed.");
  582.   }
  583.   
  584.   @Override
  585.   public void close() throws IOException {
  586.     if (fs != null) {
  587.       try {
  588.         fs.close();
  589.       } catch(IOException ie) {
  590.         //this might already be closed
  591.         // ignore
  592.       }
  593.     }
  594.   }
  595.   
  596.   /**
  597.    * Not implemented.
  598.    */
  599.   @Override
  600.   public boolean setReplication(Path src, short replication) throws IOException{
  601.     throw new IOException("Har: setreplication not allowed");
  602.   }
  603.   
  604.   /**
  605.    * Not implemented.
  606.    */
  607.   @Override
  608.   public boolean delete(Path f, boolean recursive) throws IOException { 
  609.     throw new IOException("Har: delete not allowed");
  610.   }
  611.   
  612.   /**
  613.    * liststatus returns the children of a directory 
  614.    * after looking up the index files.
  615.    */
  616.   @Override
  617.   public FileStatus[] listStatus(Path f) throws IOException {
  618.     //need to see if the file is an index in file
  619.     //get the filestatus of the archive directory
  620.     // we will create fake filestatuses to return
  621.     // to the client
  622.     List<FileStatus> statuses = new ArrayList<FileStatus>();
  623.     FileStatus archiveStatus = fs.getFileStatus(archiveIndex);
  624.     Path tmpPath = makeQualified(f);
  625.     Path harPath = getPathInHar(tmpPath);
  626.     String readStr = fileStatusInIndex(harPath);
  627.     if (readStr == null) {
  628.       throw new FileNotFoundException("File " + f + " not found in " + archivePath);
  629.     }
  630.     HarStatus hstatus = new HarStatus(readStr);
  631.     if (!hstatus.isDir()) 
  632.         statuses.add(new FileStatus(hstatus.getLength(), 
  633.             hstatus.isDir(),
  634.             archiveStatus.getReplication(), archiveStatus.getBlockSize(),
  635.             archiveStatus.getModificationTime(), archiveStatus.getAccessTime(),
  636.             new FsPermission(archiveStatus.getPermission()),
  637.             archiveStatus.getOwner(), archiveStatus.getGroup(), 
  638.             makeRelative(this.uri.toString(), new Path(hstatus.name))));
  639.     else 
  640.       for (String child: hstatus.children) {
  641.         FileStatus tmp = getFileStatus(new Path(tmpPath, child));
  642.         statuses.add(tmp);
  643.       }
  644.     return statuses.toArray(new FileStatus[statuses.size()]);
  645.   }
  646.   
  647.   /**
  648.    * return the top level archive path.
  649.    */
  650.   public Path getHomeDirectory() {
  651.     return new Path(uri.toString());
  652.   }
  653.   
  654.   public void setWorkingDirectory(Path newDir) {
  655.     //does nothing.
  656.   }
  657.   
  658.   /**
  659.    * not implemented.
  660.    */
  661.   public boolean mkdirs(Path f, FsPermission permission) throws IOException {
  662.     throw new IOException("Har: mkdirs not allowed");
  663.   }
  664.   
  665.   /**
  666.    * not implemented.
  667.    */
  668.   public void copyFromLocalFile(boolean delSrc, Path src, Path dst) throws 
  669.         IOException {
  670.     throw new IOException("Har: copyfromlocalfile not allowed");
  671.   }
  672.   
  673.   /**
  674.    * copies the file in the har filesystem to a local file.
  675.    */
  676.   public void copyToLocalFile(boolean delSrc, Path src, Path dst) 
  677.     throws IOException {
  678.     FileUtil.copy(this, src, getLocal(getConf()), dst, false, getConf());
  679.   }
  680.   
  681.   /**
  682.    * not implemented.
  683.    */
  684.   public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile) 
  685.     throws IOException {
  686.     throw new IOException("Har: startLocalOutput not allowed");
  687.   }
  688.   
  689.   /**
  690.    * not implemented.
  691.    */
  692.   public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile) 
  693.     throws IOException {
  694.     throw new IOException("Har: completeLocalOutput not allowed");
  695.   }
  696.   
  697.   /**
  698.    * not implemented.
  699.    */
  700.   public void setOwner(Path p, String username, String groupname)
  701.     throws IOException {
  702.     throw new IOException("Har: setowner not allowed");
  703.   }
  704.   /**
  705.    * Not implemented.
  706.    */
  707.   public void setPermission(Path p, FsPermission permisssion) 
  708.     throws IOException {
  709.     throw new IOException("Har: setPermission not allowed");
  710.   }
  711.   
  712.   /**
  713.    * Hadoop archives input stream. This input stream fakes EOF 
  714.    * since archive files are part of bigger part files.
  715.    */
  716.   private static class HarFSDataInputStream extends FSDataInputStream {
  717.     /**
  718.      * Create an input stream that fakes all the reads/positions/seeking.
  719.      */
  720.     private static class HarFsInputStream extends FSInputStream {
  721.       private long position, start, end;
  722.       //The underlying data input stream that the
  723.       // underlying filesystem will return.
  724.       private FSDataInputStream underLyingStream;
  725.       //one byte buffer
  726.       private byte[] oneBytebuff = new byte[1];
  727.       HarFsInputStream(FileSystem fs, Path path, long start,
  728.           long length, int bufferSize) throws IOException {
  729.         underLyingStream = fs.open(path, bufferSize);
  730.         underLyingStream.seek(start);
  731.         // the start of this file in the part file
  732.         this.start = start;
  733.         // the position pointer in the part file
  734.         this.position = start;
  735.         // the end pointer in the part file
  736.         this.end = start + length;
  737.       }
  738.       
  739.       public synchronized int available() throws IOException {
  740.         long remaining = end - underLyingStream.getPos();
  741.         if (remaining > (long)Integer.MAX_VALUE) {
  742.           return Integer.MAX_VALUE;
  743.         }
  744.         return (int) remaining;
  745.       }
  746.       
  747.       public synchronized  void close() throws IOException {
  748.         underLyingStream.close();
  749.         super.close();
  750.       }
  751.       
  752.       //not implemented
  753.       @Override
  754.       public void mark(int readLimit) {
  755.         // do nothing 
  756.       }
  757.       
  758.       /**
  759.        * reset is not implemented
  760.        */
  761.       public void reset() throws IOException {
  762.         throw new IOException("reset not implemented.");
  763.       }
  764.       
  765.       public synchronized int read() throws IOException {
  766.         int ret = read(oneBytebuff, 0, 1);
  767.         return (ret <= 0) ? -1: (oneBytebuff[0] & 0xff);
  768.       }
  769.       
  770.       public synchronized int read(byte[] b) throws IOException {
  771.         int ret = read(b, 0, b.length);
  772.         if (ret != -1) {
  773.           position += ret;
  774.         }
  775.         return ret;
  776.       }
  777.       
  778.       /**
  779.        * 
  780.        */
  781.       public synchronized int read(byte[] b, int offset, int len) 
  782.         throws IOException {
  783.         int newlen = len;
  784.         int ret = -1;
  785.         if (position + len > end) {
  786.           newlen = (int) (end - position);
  787.         }
  788.         // end case
  789.         if (newlen == 0) 
  790.           return ret;
  791.         ret = underLyingStream.read(b, offset, newlen);
  792.         position += ret;
  793.         return ret;
  794.       }
  795.       
  796.       public synchronized long skip(long n) throws IOException {
  797.         long tmpN = n;
  798.         if (tmpN > 0) {
  799.           if (position + tmpN > end) {
  800.             tmpN = end - position;
  801.           }
  802.           underLyingStream.seek(tmpN + position);
  803.           position += tmpN;
  804.           return tmpN;
  805.         }
  806.         return (tmpN < 0)? -1 : 0;
  807.       }
  808.       
  809.       public synchronized long getPos() throws IOException {
  810.         return (position - start);
  811.       }
  812.       
  813.       public synchronized void seek(long pos) throws IOException {
  814.         if (pos < 0 || (start + pos > end)) {
  815.           throw new IOException("Failed to seek: EOF");
  816.         }
  817.         position = start + pos;
  818.         underLyingStream.seek(position);
  819.       }
  820.       public boolean seekToNewSource(long targetPos) throws IOException {
  821.         //do not need to implement this
  822.         // hdfs in itself does seektonewsource 
  823.         // while reading.
  824.         return false;
  825.       }
  826.       
  827.       /**
  828.        * implementing position readable. 
  829.        */
  830.       public int read(long pos, byte[] b, int offset, int length) 
  831.       throws IOException {
  832.         int nlength = length;
  833.         if (start + nlength + pos > end) {
  834.           nlength = (int) (end - (start + pos));
  835.         }
  836.         return underLyingStream.read(pos + start , b, offset, nlength);
  837.       }
  838.       
  839.       /**
  840.        * position readable again.
  841.        */
  842.       public void readFully(long pos, byte[] b, int offset, int length) 
  843.       throws IOException {
  844.         if (start + length + pos > end) {
  845.           throw new IOException("Not enough bytes to read.");
  846.         }
  847.         underLyingStream.readFully(pos + start, b, offset, length);
  848.       }
  849.       
  850.       public void readFully(long pos, byte[] b) throws IOException {
  851.           readFully(pos, b, 0, b.length);
  852.       }
  853.       
  854.     }
  855.   
  856.     /**
  857.      * constructors for har input stream.
  858.      * @param fs the underlying filesystem
  859.      * @param p The path in the underlying filesystem
  860.      * @param start the start position in the part file
  861.      * @param length the length of valid data in the part file
  862.      * @param bufsize the buffer size
  863.      * @throws IOException
  864.      */
  865.     public HarFSDataInputStream(FileSystem fs, Path  p, long start, 
  866.         long length, int bufsize) throws IOException {
  867.         super(new HarFsInputStream(fs, p, start, length, bufsize));
  868.     }
  869.     /**
  870.      * constructor for har input stream.
  871.      * @param fs the underlying filesystem
  872.      * @param p the path in the underlying file system
  873.      * @param start the start position in the part file
  874.      * @param length the length of valid data in the part file.
  875.      * @throws IOException
  876.      */
  877.     public HarFSDataInputStream(FileSystem fs, Path  p, long start, long length)
  878.       throws IOException {
  879.         super(new HarFsInputStream(fs, p, start, length, 0));
  880.     }
  881.   }
  882. }