001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.io.compress;
020    
021    import java.io.BufferedInputStream;
022    import java.io.IOException;
023    import java.io.InputStream;
024    import java.io.OutputStream;
025    
026    
027    import org.apache.hadoop.classification.InterfaceAudience;
028    import org.apache.hadoop.classification.InterfaceStability;
029    import org.apache.hadoop.fs.Seekable;
030    import org.apache.hadoop.io.compress.bzip2.BZip2Constants;
031    import org.apache.hadoop.io.compress.bzip2.BZip2DummyCompressor;
032    import org.apache.hadoop.io.compress.bzip2.BZip2DummyDecompressor;
033    import org.apache.hadoop.io.compress.bzip2.CBZip2InputStream;
034    import org.apache.hadoop.io.compress.bzip2.CBZip2OutputStream;
035    
036    /**
037     * This class provides CompressionOutputStream and CompressionInputStream for
038     * compression and decompression. Currently we dont have an implementation of
039     * the Compressor and Decompressor interfaces, so those methods of
040     * CompressionCodec which have a Compressor or Decompressor type argument, throw
041     * UnsupportedOperationException.
042     */
043    @InterfaceAudience.Public
044    @InterfaceStability.Evolving
045    public class BZip2Codec implements SplittableCompressionCodec {
046    
047      private static final String HEADER = "BZ";
048      private static final int HEADER_LEN = HEADER.length();
049      private static final String SUB_HEADER = "h9";
050      private static final int SUB_HEADER_LEN = SUB_HEADER.length();
051    
052      /**
053      * Creates a new instance of BZip2Codec
054      */
055      public BZip2Codec() { }
056    
057      /**
058      * Creates CompressionOutputStream for BZip2
059      *
060      * @param out
061      *            The output Stream
062      * @return The BZip2 CompressionOutputStream
063      * @throws java.io.IOException
064      *             Throws IO exception
065      */
066      @Override
067      public CompressionOutputStream createOutputStream(OutputStream out)
068          throws IOException {
069        return new BZip2CompressionOutputStream(out);
070      }
071    
072      /**
073      * Creates a compressor using given OutputStream.
074       *
075      * @return CompressionOutputStream
076        @throws java.io.IOException
077       */
078      @Override
079      public CompressionOutputStream createOutputStream(OutputStream out,
080          Compressor compressor) throws IOException {
081        return createOutputStream(out);
082      }
083    
084      /**
085      * This functionality is currently not supported.
086      *
087      * @return BZip2DummyCompressor.class
088      */
089      @Override
090      public Class<? extends org.apache.hadoop.io.compress.Compressor> getCompressorType() {
091        return BZip2DummyCompressor.class;
092      }
093    
094      /**
095      * This functionality is currently not supported.
096      *
097      * @return Compressor
098      */
099      @Override
100      public Compressor createCompressor() {
101        return new BZip2DummyCompressor();
102      }
103    
104      /**
105      * Creates CompressionInputStream to be used to read off uncompressed data.
106      *
107      * @param in
108      *            The InputStream
109      * @return Returns CompressionInputStream for BZip2
110      * @throws java.io.IOException
111      *             Throws IOException
112      */
113      @Override
114      public CompressionInputStream createInputStream(InputStream in)
115          throws IOException {
116        return new BZip2CompressionInputStream(in);
117      }
118    
119      /**
120      * This functionality is currently not supported.
121      *
122      * @return CompressionInputStream
123      */
124      @Override
125      public CompressionInputStream createInputStream(InputStream in,
126          Decompressor decompressor) throws IOException {
127        return createInputStream(in);
128      }
129    
130      /**
131       * Creates CompressionInputStream to be used to read off uncompressed data
132       * in one of the two reading modes. i.e. Continuous or Blocked reading modes
133       *
134       * @param seekableIn The InputStream
135       * @param start The start offset into the compressed stream
136       * @param end The end offset into the compressed stream
137       * @param readMode Controls whether progress is reported continuously or
138       *                 only at block boundaries.
139       *
140       * @return CompressionInputStream for BZip2 aligned at block boundaries
141       */
142      @Override
143      public SplitCompressionInputStream createInputStream(InputStream seekableIn,
144          Decompressor decompressor, long start, long end, READ_MODE readMode)
145          throws IOException {
146    
147        if (!(seekableIn instanceof Seekable)) {
148          throw new IOException("seekableIn must be an instance of " +
149              Seekable.class.getName());
150        }
151    
152        //find the position of first BZip2 start up marker
153        ((Seekable)seekableIn).seek(0);
154    
155        // BZip2 start of block markers are of 6 bytes.  But the very first block
156        // also has "BZh9", making it 10 bytes.  This is the common case.  But at
157        // time stream might start without a leading BZ.
158        final long FIRST_BZIP2_BLOCK_MARKER_POSITION =
159          CBZip2InputStream.numberOfBytesTillNextMarker(seekableIn);
160        long adjStart = Math.max(0L, start - FIRST_BZIP2_BLOCK_MARKER_POSITION);
161    
162        ((Seekable)seekableIn).seek(adjStart);
163        SplitCompressionInputStream in =
164          new BZip2CompressionInputStream(seekableIn, adjStart, end, readMode);
165    
166    
167        // The following if clause handles the following case:
168        // Assume the following scenario in BZip2 compressed stream where
169        // . represent compressed data.
170        // .....[48 bit Block].....[48 bit   Block].....[48 bit Block]...
171        // ........................[47 bits][1 bit].....[48 bit Block]...
172        // ................................^[Assume a Byte alignment here]
173        // ........................................^^[current position of stream]
174        // .....................^^[We go back 10 Bytes in stream and find a Block marker]
175        // ........................................^^[We align at wrong position!]
176        // ...........................................................^^[While this pos is correct]
177    
178        if (in.getPos() <= start) {
179          ((Seekable)seekableIn).seek(start);
180          in = new BZip2CompressionInputStream(seekableIn, start, end, readMode);
181        }
182    
183        return in;
184      }
185    
186      /**
187      * This functionality is currently not supported.
188      *
189      * @return BZip2DummyDecompressor.class
190      */
191      @Override
192      public Class<? extends org.apache.hadoop.io.compress.Decompressor> getDecompressorType() {
193        return BZip2DummyDecompressor.class;
194      }
195    
196      /**
197      * This functionality is currently not supported.
198      *
199      * @return Decompressor
200      */
201      @Override
202      public Decompressor createDecompressor() {
203        return new BZip2DummyDecompressor();
204      }
205    
206      /**
207      * .bz2 is recognized as the default extension for compressed BZip2 files
208      *
209      * @return A String telling the default bzip2 file extension
210      */
211      @Override
212      public String getDefaultExtension() {
213        return ".bz2";
214      }
215    
216      private static class BZip2CompressionOutputStream extends
217          CompressionOutputStream {
218    
219        // class data starts here//
220        private CBZip2OutputStream output;
221        private boolean needsReset; 
222        // class data ends here//
223    
224        public BZip2CompressionOutputStream(OutputStream out)
225            throws IOException {
226          super(out);
227          needsReset = true;
228        }
229    
230        private void writeStreamHeader() throws IOException {
231          if (super.out != null) {
232            // The compressed bzip2 stream should start with the
233            // identifying characters BZ. Caller of CBZip2OutputStream
234            // i.e. this class must write these characters.
235            out.write(HEADER.getBytes());
236          }
237        }
238    
239        @Override
240        public void finish() throws IOException {
241          if (needsReset) {
242            // In the case that nothing is written to this stream, we still need to
243            // write out the header before closing, otherwise the stream won't be
244            // recognized by BZip2CompressionInputStream.
245            internalReset();
246          }
247          this.output.finish();
248          needsReset = true;
249        }
250    
251        private void internalReset() throws IOException {
252          if (needsReset) {
253            needsReset = false;
254            writeStreamHeader();
255            this.output = new CBZip2OutputStream(out);
256          }
257        }    
258        
259        @Override
260        public void resetState() throws IOException {
261          // Cannot write to out at this point because out might not be ready
262          // yet, as in SequenceFile.Writer implementation.
263          needsReset = true;
264        }
265    
266        @Override
267        public void write(int b) throws IOException {
268          if (needsReset) {
269            internalReset();
270          }
271          this.output.write(b);
272        }
273    
274        @Override
275        public void write(byte[] b, int off, int len) throws IOException {
276          if (needsReset) {
277            internalReset();
278          }
279          this.output.write(b, off, len);
280        }
281    
282        @Override
283        public void close() throws IOException {
284          if (needsReset) {
285            // In the case that nothing is written to this stream, we still need to
286            // write out the header before closing, otherwise the stream won't be
287            // recognized by BZip2CompressionInputStream.
288            internalReset();
289          }
290          this.output.flush();
291          this.output.close();
292          needsReset = true;
293        }
294    
295      }// end of class BZip2CompressionOutputStream
296    
297      /**
298       * This class is capable to de-compress BZip2 data in two modes;
299       * CONTINOUS and BYBLOCK.  BYBLOCK mode makes it possible to
300       * do decompression starting any arbitrary position in the stream.
301       *
302       * So this facility can easily be used to parallelize decompression
303       * of a large BZip2 file for performance reasons.  (It is exactly
304       * done so for Hadoop framework.  See LineRecordReader for an
305       * example).  So one can break the file (of course logically) into
306       * chunks for parallel processing.  These "splits" should be like
307       * default Hadoop splits (e.g as in FileInputFormat getSplit metod).
308       * So this code is designed and tested for FileInputFormat's way
309       * of splitting only.
310       */
311    
312      private static class BZip2CompressionInputStream extends
313          SplitCompressionInputStream {
314    
315        // class data starts here//
316        private CBZip2InputStream input;
317        boolean needsReset;
318        private BufferedInputStream bufferedIn;
319        private boolean isHeaderStripped = false;
320        private boolean isSubHeaderStripped = false;
321        private READ_MODE readMode = READ_MODE.CONTINUOUS;
322        private long startingPos = 0L;
323    
324        // Following state machine handles different states of compressed stream
325        // position
326        // HOLD : Don't advertise compressed stream position
327        // ADVERTISE : Read 1 more character and advertise stream position
328        // See more comments about it before updatePos method.
329        private enum POS_ADVERTISEMENT_STATE_MACHINE {
330          HOLD, ADVERTISE
331        };
332    
333        POS_ADVERTISEMENT_STATE_MACHINE posSM = POS_ADVERTISEMENT_STATE_MACHINE.HOLD;
334        long compressedStreamPosition = 0;
335    
336        // class data ends here//
337    
338        public BZip2CompressionInputStream(InputStream in) throws IOException {
339          this(in, 0L, Long.MAX_VALUE, READ_MODE.CONTINUOUS);
340        }
341    
342        public BZip2CompressionInputStream(InputStream in, long start, long end,
343            READ_MODE readMode) throws IOException {
344          super(in, start, end);
345          needsReset = false;
346          bufferedIn = new BufferedInputStream(super.in);
347          this.startingPos = super.getPos();
348          this.readMode = readMode;
349          if (this.startingPos == 0) {
350            // We only strip header if it is start of file
351            bufferedIn = readStreamHeader();
352          }
353          input = new CBZip2InputStream(bufferedIn, readMode);
354          if (this.isHeaderStripped) {
355            input.updateReportedByteCount(HEADER_LEN);
356          }
357    
358          if (this.isSubHeaderStripped) {
359            input.updateReportedByteCount(SUB_HEADER_LEN);
360          }
361    
362          this.updatePos(false);
363        }
364    
365        private BufferedInputStream readStreamHeader() throws IOException {
366          // We are flexible enough to allow the compressed stream not to
367          // start with the header of BZ. So it works fine either we have
368          // the header or not.
369          if (super.in != null) {
370            bufferedIn.mark(HEADER_LEN);
371            byte[] headerBytes = new byte[HEADER_LEN];
372            int actualRead = bufferedIn.read(headerBytes, 0, HEADER_LEN);
373            if (actualRead != -1) {
374              String header = new String(headerBytes);
375              if (header.compareTo(HEADER) != 0) {
376                bufferedIn.reset();
377              } else {
378                this.isHeaderStripped = true;
379                // In case of BYBLOCK mode, we also want to strip off
380                // remaining two character of the header.
381                if (this.readMode == READ_MODE.BYBLOCK) {
382                  actualRead = bufferedIn.read(headerBytes, 0,
383                      SUB_HEADER_LEN);
384                  if (actualRead != -1) {
385                    this.isSubHeaderStripped = true;
386                  }
387                }
388              }
389            }
390          }
391    
392          if (bufferedIn == null) {
393            throw new IOException("Failed to read bzip2 stream.");
394          }
395    
396          return bufferedIn;
397    
398        }// end of method
399    
400        @Override
401        public void close() throws IOException {
402          if (!needsReset) {
403            input.close();
404            needsReset = true;
405          }
406        }
407    
408        /**
409        * This method updates compressed stream position exactly when the
410        * client of this code has read off at least one byte passed any BZip2
411        * end of block marker.
412        *
413        * This mechanism is very helpful to deal with data level record
414        * boundaries. Please see constructor and next methods of
415        * org.apache.hadoop.mapred.LineRecordReader as an example usage of this
416        * feature.  We elaborate it with an example in the following:
417        *
418        * Assume two different scenarios of the BZip2 compressed stream, where
419        * [m] represent end of block, \n is line delimiter and . represent compressed
420        * data.
421        *
422        * ............[m]......\n.......
423        *
424        * ..........\n[m]......\n.......
425        *
426        * Assume that end is right after [m].  In the first case the reading
427        * will stop at \n and there is no need to read one more line.  (To see the
428        * reason of reading one more line in the next() method is explained in LineRecordReader.)
429        * While in the second example LineRecordReader needs to read one more line
430        * (till the second \n).  Now since BZip2Codecs only update position
431        * at least one byte passed a maker, so it is straight forward to differentiate
432        * between the two cases mentioned.
433        *
434        */
435    
436        @Override
437        public int read(byte[] b, int off, int len) throws IOException {
438          if (needsReset) {
439            internalReset();
440          }
441    
442          int result = 0;
443          result = this.input.read(b, off, len);
444          if (result == BZip2Constants.END_OF_BLOCK) {
445            this.posSM = POS_ADVERTISEMENT_STATE_MACHINE.ADVERTISE;
446          }
447    
448          if (this.posSM == POS_ADVERTISEMENT_STATE_MACHINE.ADVERTISE) {
449            result = this.input.read(b, off, off + 1);
450            // This is the precise time to update compressed stream position
451            // to the client of this code.
452            this.updatePos(true);
453            this.posSM = POS_ADVERTISEMENT_STATE_MACHINE.HOLD;
454          }
455    
456          return result;
457    
458        }
459    
460        @Override
461        public int read() throws IOException {
462          byte b[] = new byte[1];
463          int result = this.read(b, 0, 1);
464          return (result < 0) ? result : (b[0] & 0xff);
465        }
466    
467        private void internalReset() throws IOException {
468          if (needsReset) {
469            needsReset = false;
470            BufferedInputStream bufferedIn = readStreamHeader();
471            input = new CBZip2InputStream(bufferedIn, this.readMode);
472          }
473        }    
474        
475        @Override
476        public void resetState() throws IOException {
477          // Cannot read from bufferedIn at this point because bufferedIn
478          // might not be ready
479          // yet, as in SequenceFile.Reader implementation.
480          needsReset = true;
481        }
482    
483        @Override
484        public long getPos() {
485          return this.compressedStreamPosition;
486          }
487    
488        /*
489         * As the comments before read method tell that
490         * compressed stream is advertised when at least
491         * one byte passed EOB have been read off.  But
492         * there is an exception to this rule.  When we
493         * construct the stream we advertise the position
494         * exactly at EOB.  In the following method
495         * shouldAddOn boolean captures this exception.
496         *
497         */
498        private void updatePos(boolean shouldAddOn) {
499          int addOn = shouldAddOn ? 1 : 0;
500          this.compressedStreamPosition = this.startingPos
501              + this.input.getProcessedByteCount() + addOn;
502        }
503    
504      }// end of BZip2CompressionInputStream
505    
506    }