001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.io.compress;
020    
021    import java.io.EOFException;
022    import java.io.IOException;
023    import java.io.InputStream;
024    
025    import org.apache.hadoop.classification.InterfaceAudience;
026    import org.apache.hadoop.classification.InterfaceStability;
027    
028    /**
029     * A {@link org.apache.hadoop.io.compress.DecompressorStream} which works
030     * with 'block-based' based compression algorithms, as opposed to 
031     * 'stream-based' compression algorithms.
032     *  
033     */
034    @InterfaceAudience.Public
035    @InterfaceStability.Evolving
036    public class BlockDecompressorStream extends DecompressorStream {
037      private int originalBlockSize = 0;
038      private int noUncompressedBytes = 0;
039    
040      /**
041       * Create a {@link BlockDecompressorStream}.
042       * 
043       * @param in input stream
044       * @param decompressor decompressor to use
045       * @param bufferSize size of buffer
046       * @throws IOException
047       */
048      public BlockDecompressorStream(InputStream in, Decompressor decompressor, 
049                                     int bufferSize) throws IOException {
050        super(in, decompressor, bufferSize);
051      }
052    
053      /**
054       * Create a {@link BlockDecompressorStream}.
055       * 
056       * @param in input stream
057       * @param decompressor decompressor to use
058       * @throws IOException
059       */
060      public BlockDecompressorStream(InputStream in, Decompressor decompressor) throws IOException {
061        super(in, decompressor);
062      }
063    
064      protected BlockDecompressorStream(InputStream in) throws IOException {
065        super(in);
066      }
067    
068      @Override
069      protected int decompress(byte[] b, int off, int len) throws IOException {
070        // Check if we are the beginning of a block
071        if (noUncompressedBytes == originalBlockSize) {
072          // Get original data size
073          try {
074            originalBlockSize =  rawReadInt();
075          } catch (IOException ioe) {
076            return -1;
077          }
078          noUncompressedBytes = 0;
079          // EOF if originalBlockSize is 0
080          // This will occur only when decompressing previous compressed empty file
081          if (originalBlockSize == 0) {
082            eof = true;
083            return -1;
084          }
085        }
086    
087        int n = 0;
088        while ((n = decompressor.decompress(b, off, len)) == 0) {
089          if (decompressor.finished() || decompressor.needsDictionary()) {
090            if (noUncompressedBytes >= originalBlockSize) {
091              eof = true;
092              return -1;
093            }
094          }
095          if (decompressor.needsInput()) {
096            int m = getCompressedData();
097            // Send the read data to the decompressor
098            decompressor.setInput(buffer, 0, m);
099          }
100        }
101    
102        // Note the no. of decompressed bytes read from 'current' block
103        noUncompressedBytes += n;
104    
105        return n;
106      }
107    
108      @Override
109      protected int getCompressedData() throws IOException {
110        checkStream();
111    
112        // Get the size of the compressed chunk (always non-negative)
113        int len = rawReadInt();
114    
115        // Read len bytes from underlying stream 
116        if (len > buffer.length) {
117          buffer = new byte[len];
118        }
119        int n = 0, off = 0;
120        while (n < len) {
121          int count = in.read(buffer, off + n, len - n);
122          if (count < 0) {
123            throw new EOFException("Unexpected end of block in input stream");
124          }
125          n += count;
126        }
127    
128        return len;
129      }
130    
131      @Override
132      public void resetState() throws IOException {
133        originalBlockSize = 0;
134        noUncompressedBytes = 0;
135        super.resetState();
136      }
137    
138      private int rawReadInt() throws IOException {
139        int b1 = in.read();
140        int b2 = in.read();
141        int b3 = in.read();
142        int b4 = in.read();
143        if ((b1 | b2 | b3 | b4) < 0)
144          throw new EOFException();
145        return ((b1 << 24) + (b2 << 16) + (b3 << 8) + (b4 << 0));
146      }
147    }