001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.hadoop.fs;
019    
020    import java.io.*;
021    import java.nio.ByteBuffer;
022    
023    import org.apache.hadoop.classification.InterfaceAudience;
024    import org.apache.hadoop.classification.InterfaceStability;
025    
026    /** Utility that wraps a {@link FSInputStream} in a {@link DataInputStream}
027     * and buffers input through a {@link BufferedInputStream}. */
028    @InterfaceAudience.Public
029    @InterfaceStability.Stable
030    public class FSDataInputStream extends DataInputStream
031        implements Seekable, PositionedReadable, Closeable, ByteBufferReadable, HasFileDescriptor {
032    
033      public FSDataInputStream(InputStream in)
034        throws IOException {
035        super(in);
036        if( !(in instanceof Seekable) || !(in instanceof PositionedReadable) ) {
037          throw new IllegalArgumentException(
038              "In is not an instance of Seekable or PositionedReadable");
039        }
040      }
041      
042      /**
043       * Seek to the given offset.
044       *
045       * @param desired offset to seek to
046       */
047      @Override
048      public synchronized void seek(long desired) throws IOException {
049        ((Seekable)in).seek(desired);
050      }
051    
052      /**
053       * Get the current position in the input stream.
054       *
055       * @return current position in the input stream
056       */
057      @Override
058      public long getPos() throws IOException {
059        return ((Seekable)in).getPos();
060      }
061      
062      /**
063       * Read bytes from the given position in the stream to the given buffer.
064       *
065       * @param position  position in the input stream to seek
066       * @param buffer    buffer into which data is read
067       * @param offset    offset into the buffer in which data is written
068       * @param length    maximum number of bytes to read
069       * @return total number of bytes read into the buffer, or <code>-1</code>
070       *         if there is no more data because the end of the stream has been
071       *         reached
072       */
073      @Override
074      public int read(long position, byte[] buffer, int offset, int length)
075        throws IOException {
076        return ((PositionedReadable)in).read(position, buffer, offset, length);
077      }
078    
079      /**
080       * Read bytes from the given position in the stream to the given buffer.
081       * Continues to read until <code>length</code> bytes have been read.
082       *
083       * @param position  position in the input stream to seek
084       * @param buffer    buffer into which data is read
085       * @param offset    offset into the buffer in which data is written
086       * @param length    the number of bytes to read
087       * @throws EOFException If the end of stream is reached while reading.
088       *                      If an exception is thrown an undetermined number
089       *                      of bytes in the buffer may have been written. 
090       */
091      @Override
092      public void readFully(long position, byte[] buffer, int offset, int length)
093        throws IOException {
094        ((PositionedReadable)in).readFully(position, buffer, offset, length);
095      }
096      
097      /**
098       * See {@link #readFully(long, byte[], int, int)}.
099       */
100      @Override
101      public void readFully(long position, byte[] buffer)
102        throws IOException {
103        ((PositionedReadable)in).readFully(position, buffer, 0, buffer.length);
104      }
105      
106      /**
107       * Seek to the given position on an alternate copy of the data.
108       *
109       * @param  targetPos  position to seek to
110       * @return true if a new source is found, false otherwise
111       */
112      @Override
113      public boolean seekToNewSource(long targetPos) throws IOException {
114        return ((Seekable)in).seekToNewSource(targetPos); 
115      }
116      
117      /**
118       * Get a reference to the wrapped input stream. Used by unit tests.
119       *
120       * @return the underlying input stream
121       */
122      @InterfaceAudience.LimitedPrivate({"HDFS"})
123      public InputStream getWrappedStream() {
124        return in;
125      }
126    
127      @Override
128      public int read(ByteBuffer buf) throws IOException {
129        if (in instanceof ByteBufferReadable) {
130          return ((ByteBufferReadable)in).read(buf);
131        }
132    
133        throw new UnsupportedOperationException("Byte-buffer read unsupported by input stream");
134      }
135    
136      @Override
137      public FileDescriptor getFileDescriptor() throws IOException {
138        if (in instanceof HasFileDescriptor) {
139          return ((HasFileDescriptor) in).getFileDescriptor();
140        } else if (in instanceof FileInputStream) {
141          return ((FileInputStream) in).getFD();
142        } else {
143          return null;
144        }
145      }
146    }