CompositeInputSplit.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:4k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred.join;
  19. import java.io.DataInput;
  20. import java.io.DataOutput;
  21. import java.io.IOException;
  22. import java.util.HashSet;
  23. import org.apache.hadoop.io.Text;
  24. import org.apache.hadoop.io.WritableUtils;
  25. import org.apache.hadoop.mapred.InputSplit;
  26. import org.apache.hadoop.util.ReflectionUtils;
  27. /**
  28.  * This InputSplit contains a set of child InputSplits. Any InputSplit inserted
  29.  * into this collection must have a public default constructor.
  30.  */
  31. public class CompositeInputSplit implements InputSplit {
  32.   private int fill = 0;
  33.   private long totsize = 0L;
  34.   private InputSplit[] splits;
  35.   public CompositeInputSplit() { }
  36.   public CompositeInputSplit(int capacity) {
  37.     splits = new InputSplit[capacity];
  38.   }
  39.   /**
  40.    * Add an InputSplit to this collection.
  41.    * @throws IOException If capacity was not specified during construction
  42.    *                     or if capacity has been reached.
  43.    */
  44.   public void add(InputSplit s) throws IOException {
  45.     if (null == splits) {
  46.       throw new IOException("Uninitialized InputSplit");
  47.     }
  48.     if (fill == splits.length) {
  49.       throw new IOException("Too many splits");
  50.     }
  51.     splits[fill++] = s;
  52.     totsize += s.getLength();
  53.   }
  54.   /**
  55.    * Get ith child InputSplit.
  56.    */
  57.   public InputSplit get(int i) {
  58.     return splits[i];
  59.   }
  60.   /**
  61.    * Return the aggregate length of all child InputSplits currently added.
  62.    */
  63.   public long getLength() throws IOException {
  64.     return totsize;
  65.   }
  66.   /**
  67.    * Get the length of ith child InputSplit.
  68.    */
  69.   public long getLength(int i) throws IOException {
  70.     return splits[i].getLength();
  71.   }
  72.   /**
  73.    * Collect a set of hosts from all child InputSplits.
  74.    */
  75.   public String[] getLocations() throws IOException {
  76.     HashSet<String> hosts = new HashSet<String>();
  77.     for (InputSplit s : splits) {
  78.       String[] hints = s.getLocations();
  79.       if (hints != null && hints.length > 0) {
  80.         for (String host : hints) {
  81.           hosts.add(host);
  82.         }
  83.       }
  84.     }
  85.     return hosts.toArray(new String[hosts.size()]);
  86.   }
  87.   /**
  88.    * getLocations from ith InputSplit.
  89.    */
  90.   public String[] getLocation(int i) throws IOException {
  91.     return splits[i].getLocations();
  92.   }
  93.   /**
  94.    * Write splits in the following format.
  95.    * {@code
  96.    * <count><class1><class2>...<classn><split1><split2>...<splitn>
  97.    * }
  98.    */
  99.   public void write(DataOutput out) throws IOException {
  100.     WritableUtils.writeVInt(out, splits.length);
  101.     for (InputSplit s : splits) {
  102.       Text.writeString(out, s.getClass().getName());
  103.     }
  104.     for (InputSplit s : splits) {
  105.       s.write(out);
  106.     }
  107.   }
  108.   /**
  109.    * {@inheritDoc}
  110.    * @throws IOException If the child InputSplit cannot be read, typically
  111.    *                     for faliing access checks.
  112.    */
  113.   @SuppressWarnings("unchecked")  // Generic array assignment
  114.   public void readFields(DataInput in) throws IOException {
  115.     int card = WritableUtils.readVInt(in);
  116.     if (splits == null || splits.length != card) {
  117.       splits = new InputSplit[card];
  118.     }
  119.     Class<? extends InputSplit>[] cls = new Class[card];
  120.     try {
  121.       for (int i = 0; i < card; ++i) {
  122.         cls[i] =
  123.           Class.forName(Text.readString(in)).asSubclass(InputSplit.class);
  124.       }
  125.       for (int i = 0; i < card; ++i) {
  126.         splits[i] = ReflectionUtils.newInstance(cls[i], null);
  127.         splits[i].readFields(in);
  128.       }
  129.     } catch (ClassNotFoundException e) {
  130.       throw (IOException)new IOException("Failed split init").initCause(e);
  131.     }
  132.   }
  133. }