BinaryProtocol.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:12k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred.pipes;
  19. import java.io.*;
  20. import java.net.Socket;
  21. import java.util.ArrayList;
  22. import java.util.List;
  23. import java.util.Map;
  24. import org.apache.commons.logging.Log;
  25. import org.apache.commons.logging.LogFactory;
  26. import org.apache.hadoop.io.BytesWritable;
  27. import org.apache.hadoop.io.DataOutputBuffer;
  28. import org.apache.hadoop.io.Text;
  29. import org.apache.hadoop.io.Writable;
  30. import org.apache.hadoop.io.WritableComparable;
  31. import org.apache.hadoop.io.WritableUtils;
  32. import org.apache.hadoop.mapred.InputSplit;
  33. import org.apache.hadoop.mapred.JobConf;
  34. import org.apache.hadoop.util.StringUtils;
  35. /**
  36.  * This protocol is a binary implementation of the Pipes protocol.
  37.  */
  38. class BinaryProtocol<K1 extends WritableComparable, V1 extends Writable,
  39.                      K2 extends WritableComparable, V2 extends Writable>
  40.   implements DownwardProtocol<K1, V1> {
  41.   
  42.   public static final int CURRENT_PROTOCOL_VERSION = 0;
  43.   /**
  44.    * The buffer size for the command socket
  45.    */
  46.   private static final int BUFFER_SIZE = 128*1024;
  47.   private DataOutputStream stream;
  48.   private DataOutputBuffer buffer = new DataOutputBuffer();
  49.   private static final Log LOG = 
  50.     LogFactory.getLog(BinaryProtocol.class.getName());
  51.   private UplinkReaderThread uplink;
  52.   /**
  53.    * The integer codes to represent the different messages. These must match
  54.    * the C++ codes or massive confusion will result.
  55.    */
  56.   private static enum MessageType { START(0),
  57.                                     SET_JOB_CONF(1),
  58.                                     SET_INPUT_TYPES(2),
  59.                                     RUN_MAP(3),
  60.                                     MAP_ITEM(4),
  61.                                     RUN_REDUCE(5),
  62.                                     REDUCE_KEY(6),
  63.                                     REDUCE_VALUE(7),
  64.                                     CLOSE(8),
  65.                                     ABORT(9),
  66.                                     OUTPUT(50),
  67.                                     PARTITIONED_OUTPUT(51),
  68.                                     STATUS(52),
  69.                                     PROGRESS(53),
  70.                                     DONE(54),
  71.                                     REGISTER_COUNTER(55),
  72.                                     INCREMENT_COUNTER(56);
  73.     final int code;
  74.     MessageType(int code) {
  75.       this.code = code;
  76.     }
  77.   }
  78.   private static class UplinkReaderThread<K2 extends WritableComparable,
  79.                                           V2 extends Writable>  
  80.     extends Thread {
  81.     
  82.     private DataInputStream inStream;
  83.     private UpwardProtocol<K2, V2> handler;
  84.     private K2 key;
  85.     private V2 value;
  86.     
  87.     public UplinkReaderThread(InputStream stream,
  88.                               UpwardProtocol<K2, V2> handler, 
  89.                               K2 key, V2 value) throws IOException{
  90.       inStream = new DataInputStream(new BufferedInputStream(stream, 
  91.                                                              BUFFER_SIZE));
  92.       this.handler = handler;
  93.       this.key = key;
  94.       this.value = value;
  95.     }
  96.     public void closeConnection() throws IOException {
  97.       inStream.close();
  98.     }
  99.     public void run() {
  100.       while (true) {
  101.         try {
  102.           if (Thread.currentThread().isInterrupted()) {
  103.             throw new InterruptedException();
  104.           }
  105.           int cmd = WritableUtils.readVInt(inStream);
  106.           LOG.debug("Handling uplink command " + cmd);
  107.           if (cmd == MessageType.OUTPUT.code) {
  108.             readObject(key);
  109.             readObject(value);
  110.             handler.output(key, value);
  111.           } else if (cmd == MessageType.PARTITIONED_OUTPUT.code) {
  112.             int part = WritableUtils.readVInt(inStream);
  113.             readObject(key);
  114.             readObject(value);
  115.             handler.partitionedOutput(part, key, value);
  116.           } else if (cmd == MessageType.STATUS.code) {
  117.             handler.status(Text.readString(inStream));
  118.           } else if (cmd == MessageType.PROGRESS.code) {
  119.             handler.progress(inStream.readFloat());
  120.           } else if (cmd == MessageType.REGISTER_COUNTER.code) {
  121.             int id = WritableUtils.readVInt(inStream);
  122.             String group = Text.readString(inStream);
  123.             String name = Text.readString(inStream);
  124.             handler.registerCounter(id, group, name);
  125.           } else if (cmd == MessageType.INCREMENT_COUNTER.code) {
  126.             int id = WritableUtils.readVInt(inStream);
  127.             long amount = WritableUtils.readVLong(inStream);
  128.             handler.incrementCounter(id, amount);
  129.           } else if (cmd == MessageType.DONE.code) {
  130.             LOG.debug("Pipe child done");
  131.             handler.done();
  132.             return;
  133.           } else {
  134.             throw new IOException("Bad command code: " + cmd);
  135.           }
  136.         } catch (InterruptedException e) {
  137.           return;
  138.         } catch (Throwable e) {
  139.           LOG.error(StringUtils.stringifyException(e));
  140.           handler.failed(e);
  141.           return;
  142.         }
  143.       }
  144.     }
  145.     
  146.     private void readObject(Writable obj) throws IOException {
  147.       int numBytes = WritableUtils.readVInt(inStream);
  148.       byte[] buffer;
  149.       // For BytesWritable and Text, use the specified length to set the length
  150.       // this causes the "obvious" translations to work. So that if you emit
  151.       // a string "abc" from C++, it shows up as "abc".
  152.       if (obj instanceof BytesWritable) {
  153.         buffer = new byte[numBytes];
  154.         inStream.readFully(buffer);
  155.         ((BytesWritable) obj).set(buffer, 0, numBytes);
  156.       } else if (obj instanceof Text) {
  157.         buffer = new byte[numBytes];
  158.         inStream.readFully(buffer);
  159.         ((Text) obj).set(buffer);
  160.       } else {
  161.         obj.readFields(inStream);
  162.       }
  163.     }
  164.   }
  165.   /**
  166.    * An output stream that will save a copy of the data into a file.
  167.    */
  168.   private static class TeeOutputStream extends FilterOutputStream {
  169.     private OutputStream file;
  170.     TeeOutputStream(String filename, OutputStream base) throws IOException {
  171.       super(base);
  172.       file = new FileOutputStream(filename);
  173.     }
  174.     public void write(byte b[], int off, int len) throws IOException {
  175.       file.write(b,off,len);
  176.       out.write(b,off,len);
  177.     }
  178.     public void write(int b) throws IOException {
  179.       file.write(b);
  180.       out.write(b);
  181.     }
  182.     public void flush() throws IOException {
  183.       file.flush();
  184.       out.flush();
  185.     }
  186.     public void close() throws IOException {
  187.       flush();
  188.       file.close();
  189.       out.close();
  190.     }
  191.   }
  192.   /**
  193.    * Create a proxy object that will speak the binary protocol on a socket.
  194.    * Upward messages are passed on the specified handler and downward
  195.    * downward messages are public methods on this object.
  196.    * @param sock The socket to communicate on.
  197.    * @param handler The handler for the received messages.
  198.    * @param key The object to read keys into.
  199.    * @param value The object to read values into.
  200.    * @param config The job's configuration
  201.    * @throws IOException
  202.    */
  203.   public BinaryProtocol(Socket sock, 
  204.                         UpwardProtocol<K2, V2> handler,
  205.                         K2 key,
  206.                         V2 value,
  207.                         JobConf config) throws IOException {
  208.     OutputStream raw = sock.getOutputStream();
  209.     // If we are debugging, save a copy of the downlink commands to a file
  210.     if (Submitter.getKeepCommandFile(config)) {
  211.       raw = new TeeOutputStream("downlink.data", raw);
  212.     }
  213.     stream = new DataOutputStream(new BufferedOutputStream(raw, 
  214.                                                            BUFFER_SIZE)) ;
  215.     uplink = new UplinkReaderThread<K2, V2>(sock.getInputStream(),
  216.                                             handler, key, value);
  217.     uplink.setName("pipe-uplink-handler");
  218.     uplink.start();
  219.   }
  220.   /**
  221.    * Close the connection and shutdown the handler thread.
  222.    * @throws IOException
  223.    * @throws InterruptedException
  224.    */
  225.   public void close() throws IOException, InterruptedException {
  226.     LOG.debug("closing connection");
  227.     stream.close();
  228.     uplink.closeConnection();
  229.     uplink.interrupt();
  230.     uplink.join();
  231.   }
  232.   public void start() throws IOException {
  233.     LOG.debug("starting downlink");
  234.     WritableUtils.writeVInt(stream, MessageType.START.code);
  235.     WritableUtils.writeVInt(stream, CURRENT_PROTOCOL_VERSION);
  236.   }
  237.   public void setJobConf(JobConf job) throws IOException {
  238.     WritableUtils.writeVInt(stream, MessageType.SET_JOB_CONF.code);
  239.     List<String> list = new ArrayList<String>();
  240.     for(Map.Entry<String, String> itm: job) {
  241.       list.add(itm.getKey());
  242.       list.add(itm.getValue());
  243.     }
  244.     WritableUtils.writeVInt(stream, list.size());
  245.     for(String entry: list){
  246.       Text.writeString(stream, entry);
  247.     }
  248.   }
  249.   public void setInputTypes(String keyType, 
  250.                             String valueType) throws IOException {
  251.     WritableUtils.writeVInt(stream, MessageType.SET_INPUT_TYPES.code);
  252.     Text.writeString(stream, keyType);
  253.     Text.writeString(stream, valueType);
  254.   }
  255.   public void runMap(InputSplit split, int numReduces, 
  256.                      boolean pipedInput) throws IOException {
  257.     WritableUtils.writeVInt(stream, MessageType.RUN_MAP.code);
  258.     writeObject(split);
  259.     WritableUtils.writeVInt(stream, numReduces);
  260.     WritableUtils.writeVInt(stream, pipedInput ? 1 : 0);
  261.   }
  262.   public void mapItem(WritableComparable key, 
  263.                       Writable value) throws IOException {
  264.     WritableUtils.writeVInt(stream, MessageType.MAP_ITEM.code);
  265.     writeObject(key);
  266.     writeObject(value);
  267.   }
  268.   public void runReduce(int reduce, boolean pipedOutput) throws IOException {
  269.     WritableUtils.writeVInt(stream, MessageType.RUN_REDUCE.code);
  270.     WritableUtils.writeVInt(stream, reduce);
  271.     WritableUtils.writeVInt(stream, pipedOutput ? 1 : 0);
  272.   }
  273.   public void reduceKey(WritableComparable key) throws IOException {
  274.     WritableUtils.writeVInt(stream, MessageType.REDUCE_KEY.code);
  275.     writeObject(key);
  276.   }
  277.   public void reduceValue(Writable value) throws IOException {
  278.     WritableUtils.writeVInt(stream, MessageType.REDUCE_VALUE.code);
  279.     writeObject(value);
  280.   }
  281.   public void endOfInput() throws IOException {
  282.     WritableUtils.writeVInt(stream, MessageType.CLOSE.code);
  283.     LOG.debug("Sent close command");
  284.   }
  285.   
  286.   public void abort() throws IOException {
  287.     WritableUtils.writeVInt(stream, MessageType.ABORT.code);
  288.     LOG.debug("Sent abort command");
  289.   }
  290.   public void flush() throws IOException {
  291.     stream.flush();
  292.   }
  293.   /**
  294.    * Write the given object to the stream. If it is a Text or BytesWritable,
  295.    * write it directly. Otherwise, write it to a buffer and then write the
  296.    * length and data to the stream.
  297.    * @param obj the object to write
  298.    * @throws IOException
  299.    */
  300.   private void writeObject(Writable obj) throws IOException {
  301.     // For Text and BytesWritable, encode them directly, so that they end up
  302.     // in C++ as the natural translations.
  303.     if (obj instanceof Text) {
  304.       Text t = (Text) obj;
  305.       int len = t.getLength();
  306.       WritableUtils.writeVInt(stream, len);
  307.       stream.write(t.getBytes(), 0, len);
  308.     } else if (obj instanceof BytesWritable) {
  309.       BytesWritable b = (BytesWritable) obj;
  310.       int len = b.getLength();
  311.       WritableUtils.writeVInt(stream, len);
  312.       stream.write(b.getBytes(), 0, len);
  313.     } else {
  314.       buffer.reset();
  315.       obj.write(buffer);
  316.       int length = buffer.getLength();
  317.       WritableUtils.writeVInt(stream, length);
  318.       stream.write(buffer.getData(), 0, length);
  319.     }
  320.   }
  321.   
  322. }