001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.hadoop.io.compress;
019    
020    import java.util.ArrayList;
021    import java.util.HashMap;
022    import java.util.List;
023    import java.util.Map;
024    
025    import org.apache.commons.logging.Log;
026    import org.apache.commons.logging.LogFactory;
027    import org.apache.hadoop.classification.InterfaceAudience;
028    import org.apache.hadoop.classification.InterfaceStability;
029    import org.apache.hadoop.conf.Configuration;
030    import org.apache.hadoop.util.ReflectionUtils;
031    
032    /**
033     * A global compressor/decompressor pool used to save and reuse 
034     * (possibly native) compression/decompression codecs.
035     */
036    @InterfaceAudience.Public
037    @InterfaceStability.Evolving
038    public class CodecPool {
039      private static final Log LOG = LogFactory.getLog(CodecPool.class);
040      
041      /**
042       * A global compressor pool used to save the expensive 
043       * construction/destruction of (possibly native) decompression codecs.
044       */
045      private static final Map<Class<Compressor>, List<Compressor>> compressorPool = 
046        new HashMap<Class<Compressor>, List<Compressor>>();
047      
048      /**
049       * A global decompressor pool used to save the expensive 
050       * construction/destruction of (possibly native) decompression codecs.
051       */
052      private static final Map<Class<Decompressor>, List<Decompressor>> decompressorPool = 
053        new HashMap<Class<Decompressor>, List<Decompressor>>();
054    
055      private static <T> T borrow(Map<Class<T>, List<T>> pool,
056                                 Class<? extends T> codecClass) {
057        T codec = null;
058        
059        // Check if an appropriate codec is available
060        synchronized (pool) {
061          if (pool.containsKey(codecClass)) {
062            List<T> codecList = pool.get(codecClass);
063            
064            if (codecList != null) {
065              synchronized (codecList) {
066                if (!codecList.isEmpty()) {
067                  codec = codecList.remove(codecList.size()-1);
068                }
069              }
070            }
071          }
072        }
073        
074        return codec;
075      }
076    
077      private static <T> void payback(Map<Class<T>, List<T>> pool, T codec) {
078        if (codec != null) {
079          Class<T> codecClass = ReflectionUtils.getClass(codec);
080          synchronized (pool) {
081            if (!pool.containsKey(codecClass)) {
082              pool.put(codecClass, new ArrayList<T>());
083            }
084    
085            List<T> codecList = pool.get(codecClass);
086            synchronized (codecList) {
087              codecList.add(codec);
088            }
089          }
090        }
091      }
092      
093      /**
094       * Get a {@link Compressor} for the given {@link CompressionCodec} from the 
095       * pool or a new one.
096       *
097       * @param codec the <code>CompressionCodec</code> for which to get the 
098       *              <code>Compressor</code>
099       * @param conf the <code>Configuration</code> object which contains confs for creating or reinit the compressor
100       * @return <code>Compressor</code> for the given 
101       *         <code>CompressionCodec</code> from the pool or a new one
102       */
103      public static Compressor getCompressor(CompressionCodec codec, Configuration conf) {
104        Compressor compressor = borrow(compressorPool, codec.getCompressorType());
105        if (compressor == null) {
106          compressor = codec.createCompressor();
107          LOG.info("Got brand-new compressor ["+codec.getDefaultExtension()+"]");
108        } else {
109          compressor.reinit(conf);
110          if(LOG.isDebugEnabled()) {
111            LOG.debug("Got recycled compressor");
112          }
113        }
114        return compressor;
115      }
116      
117      public static Compressor getCompressor(CompressionCodec codec) {
118        return getCompressor(codec, null);
119      }
120      
121      /**
122       * Get a {@link Decompressor} for the given {@link CompressionCodec} from the
123       * pool or a new one.
124       *  
125       * @param codec the <code>CompressionCodec</code> for which to get the 
126       *              <code>Decompressor</code>
127       * @return <code>Decompressor</code> for the given 
128       *         <code>CompressionCodec</code> the pool or a new one
129       */
130      public static Decompressor getDecompressor(CompressionCodec codec) {
131        Decompressor decompressor = borrow(decompressorPool, codec.getDecompressorType());
132        if (decompressor == null) {
133          decompressor = codec.createDecompressor();
134          LOG.info("Got brand-new decompressor ["+codec.getDefaultExtension()+"]");
135        } else {
136          if(LOG.isDebugEnabled()) {
137            LOG.debug("Got recycled decompressor");
138          }
139        }
140        return decompressor;
141      }
142      
143      /**
144       * Return the {@link Compressor} to the pool.
145       * 
146       * @param compressor the <code>Compressor</code> to be returned to the pool
147       */
148      public static void returnCompressor(Compressor compressor) {
149        if (compressor == null) {
150          return;
151        }
152        // if the compressor can't be reused, don't pool it.
153        if (compressor.getClass().isAnnotationPresent(DoNotPool.class)) {
154          return;
155        }
156        compressor.reset();
157        payback(compressorPool, compressor);
158      }
159      
160      /**
161       * Return the {@link Decompressor} to the pool.
162       * 
163       * @param decompressor the <code>Decompressor</code> to be returned to the 
164       *                     pool
165       */
166      public static void returnDecompressor(Decompressor decompressor) {
167        if (decompressor == null) {
168          return;
169        }
170        // if the decompressor can't be reused, don't pool it.
171        if (decompressor.getClass().isAnnotationPresent(DoNotPool.class)) {
172          return;
173        }
174        decompressor.reset();
175        payback(decompressorPool, decompressor);
176      }
177    }