CsvRecordInput.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:5k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.record;
  19. import java.io.InputStreamReader;
  20. import java.io.InputStream;
  21. import java.io.IOException;
  22. import java.io.PushbackReader;
  23. import java.io.UnsupportedEncodingException;
  24. /**
  25.  */
  26. public class CsvRecordInput implements RecordInput {
  27.     
  28.   private PushbackReader stream;
  29.     
  30.   private class CsvIndex implements Index {
  31.     public boolean done() {
  32.       char c = '';
  33.       try {
  34.         c = (char) stream.read();
  35.         stream.unread(c);
  36.       } catch (IOException ex) {
  37.       }
  38.       return (c == '}') ? true : false;
  39.     }
  40.     public void incr() {}
  41.   }
  42.     
  43.   private void throwExceptionOnError(String tag) throws IOException {
  44.     throw new IOException("Error deserializing "+tag);
  45.   }
  46.     
  47.   private String readField(String tag) throws IOException {
  48.     try {
  49.       StringBuffer buf = new StringBuffer();
  50.       while (true) {
  51.         char c = (char) stream.read();
  52.         switch (c) {
  53.         case ',':
  54.           return buf.toString();
  55.         case '}':
  56.         case 'n':
  57.         case 'r':
  58.           stream.unread(c);
  59.           return buf.toString();
  60.         default:
  61.           buf.append(c);
  62.         }
  63.       }
  64.     } catch (IOException ex) {
  65.       throw new IOException("Error reading "+tag);
  66.     }
  67.   }
  68.     
  69.   /** Creates a new instance of CsvRecordInput */
  70.   public CsvRecordInput(InputStream in) {
  71.     try {
  72.       stream = new PushbackReader(new InputStreamReader(in, "UTF-8"));
  73.     } catch (UnsupportedEncodingException ex) {
  74.       throw new RuntimeException(ex);
  75.     }
  76.   }
  77.     
  78.   public byte readByte(String tag) throws IOException {
  79.     return (byte) readLong(tag);
  80.   }
  81.     
  82.   public boolean readBool(String tag) throws IOException {
  83.     String sval = readField(tag);
  84.     return "T".equals(sval) ? true : false;
  85.   }
  86.     
  87.   public int readInt(String tag) throws IOException {
  88.     return (int) readLong(tag);
  89.   }
  90.     
  91.   public long readLong(String tag) throws IOException {
  92.     String sval = readField(tag);
  93.     try {
  94.       long lval = Long.parseLong(sval);
  95.       return lval;
  96.     } catch (NumberFormatException ex) {
  97.       throw new IOException("Error deserializing "+tag);
  98.     }
  99.   }
  100.     
  101.   public float readFloat(String tag) throws IOException {
  102.     return (float) readDouble(tag);
  103.   }
  104.     
  105.   public double readDouble(String tag) throws IOException {
  106.     String sval = readField(tag);
  107.     try {
  108.       double dval = Double.parseDouble(sval);
  109.       return dval;
  110.     } catch (NumberFormatException ex) {
  111.       throw new IOException("Error deserializing "+tag);
  112.     }
  113.   }
  114.     
  115.   public String readString(String tag) throws IOException {
  116.     String sval = readField(tag);
  117.     return Utils.fromCSVString(sval);
  118.   }
  119.     
  120.   public Buffer readBuffer(String tag) throws IOException {
  121.     String sval = readField(tag);
  122.     return Utils.fromCSVBuffer(sval);
  123.   }
  124.     
  125.   public void startRecord(String tag) throws IOException {
  126.     if (tag != null && !"".equals(tag)) {
  127.       char c1 = (char) stream.read();
  128.       char c2 = (char) stream.read();
  129.       if (c1 != 's' || c2 != '{') {
  130.         throw new IOException("Error deserializing "+tag);
  131.       }
  132.     }
  133.   }
  134.     
  135.   public void endRecord(String tag) throws IOException {
  136.     char c = (char) stream.read();
  137.     if (tag == null || "".equals(tag)) {
  138.       if (c != 'n' && c != 'r') {
  139.         throw new IOException("Error deserializing record.");
  140.       } else {
  141.         return;
  142.       }
  143.     }
  144.         
  145.     if (c != '}') {
  146.       throw new IOException("Error deserializing "+tag);
  147.     }
  148.     c = (char) stream.read();
  149.     if (c != ',') {
  150.       stream.unread(c);
  151.     }
  152.         
  153.     return;
  154.   }
  155.     
  156.   public Index startVector(String tag) throws IOException {
  157.     char c1 = (char) stream.read();
  158.     char c2 = (char) stream.read();
  159.     if (c1 != 'v' || c2 != '{') {
  160.       throw new IOException("Error deserializing "+tag);
  161.     }
  162.     return new CsvIndex();
  163.   }
  164.     
  165.   public void endVector(String tag) throws IOException {
  166.     char c = (char) stream.read();
  167.     if (c != '}') {
  168.       throw new IOException("Error deserializing "+tag);
  169.     }
  170.     c = (char) stream.read();
  171.     if (c != ',') {
  172.       stream.unread(c);
  173.     }
  174.     return;
  175.   }
  176.     
  177.   public Index startMap(String tag) throws IOException {
  178.     char c1 = (char) stream.read();
  179.     char c2 = (char) stream.read();
  180.     if (c1 != 'm' || c2 != '{') {
  181.       throw new IOException("Error deserializing "+tag);
  182.     }
  183.     return new CsvIndex();
  184.   }
  185.     
  186.   public void endMap(String tag) throws IOException {
  187.     char c = (char) stream.read();
  188.     if (c != '}') {
  189.       throw new IOException("Error deserializing "+tag);
  190.     }
  191.     c = (char) stream.read();
  192.     if (c != ',') {
  193.       stream.unread(c);
  194.     }
  195.     return;
  196.   }
  197. }