001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapred.lib;
020    
021    import java.io.*;
022    import java.lang.reflect.*;
023    
024    import org.apache.hadoop.fs.FileSystem;
025    
026    import org.apache.hadoop.mapred.*;
027    import org.apache.hadoop.classification.InterfaceAudience;
028    import org.apache.hadoop.classification.InterfaceStability;
029    import org.apache.hadoop.conf.Configuration;
030    
031    /**
032     * A generic RecordReader that can hand out different recordReaders
033     * for each chunk in a {@link CombineFileSplit}.
034     * A CombineFileSplit can combine data chunks from multiple files. 
035     * This class allows using different RecordReaders for processing
036     * these data chunks from different files.
037     * @see CombineFileSplit
038     */
039    @InterfaceAudience.Public
040    @InterfaceStability.Stable
041    public class CombineFileRecordReader<K, V> implements RecordReader<K, V> {
042    
043      static final Class [] constructorSignature = new Class [] 
044                                             {CombineFileSplit.class, 
045                                              Configuration.class, 
046                                              Reporter.class,
047                                              Integer.class};
048    
049      protected CombineFileSplit split;
050      protected JobConf jc;
051      protected Reporter reporter;
052      protected Constructor<RecordReader<K, V>> rrConstructor;
053      
054      protected int idx;
055      protected long progress;
056      protected RecordReader<K, V> curReader;
057      
058      public boolean next(K key, V value) throws IOException {
059    
060        while ((curReader == null) || !curReader.next(key, value)) {
061          if (!initNextRecordReader()) {
062            return false;
063          }
064        }
065        return true;
066      }
067    
068      public K createKey() {
069        return curReader.createKey();
070      }
071      
072      public V createValue() {
073        return curReader.createValue();
074      }
075      
076      /**
077       * return the amount of data processed
078       */
079      public long getPos() throws IOException {
080        return progress;
081      }
082      
083      public void close() throws IOException {
084        if (curReader != null) {
085          curReader.close();
086          curReader = null;
087        }
088      }
089      
090      /**
091       * return progress based on the amount of data processed so far.
092       */
093      public float getProgress() throws IOException {
094        return Math.min(1.0f,  progress/(float)(split.getLength()));
095      }
096      
097      /**
098       * A generic RecordReader that can hand out different recordReaders
099       * for each chunk in the CombineFileSplit.
100       */
101      public CombineFileRecordReader(JobConf job, CombineFileSplit split, 
102                                     Reporter reporter,
103                                     Class<RecordReader<K, V>> rrClass)
104        throws IOException {
105        this.split = split;
106        this.jc = job;
107        this.reporter = reporter;
108        this.idx = 0;
109        this.curReader = null;
110        this.progress = 0;
111    
112        try {
113          rrConstructor = rrClass.getDeclaredConstructor(constructorSignature);
114          rrConstructor.setAccessible(true);
115        } catch (Exception e) {
116          throw new RuntimeException(rrClass.getName() + 
117                                     " does not have valid constructor", e);
118        }
119        initNextRecordReader();
120      }
121      
122      /**
123       * Get the record reader for the next chunk in this CombineFileSplit.
124       */
125      protected boolean initNextRecordReader() throws IOException {
126    
127        if (curReader != null) {
128          curReader.close();
129          curReader = null;
130          if (idx > 0) {
131            progress += split.getLength(idx-1);    // done processing so far
132          }
133        }
134    
135        // if all chunks have been processed, nothing more to do.
136        if (idx == split.getNumPaths()) {
137          return false;
138        }
139    
140        // get a record reader for the idx-th chunk
141        try {
142          curReader =  rrConstructor.newInstance(new Object [] 
143                                {split, jc, reporter, Integer.valueOf(idx)});
144    
145          // setup some helper config variables.
146          jc.set(JobContext.MAP_INPUT_FILE, split.getPath(idx).toString());
147          jc.setLong(JobContext.MAP_INPUT_START, split.getOffset(idx));
148          jc.setLong(JobContext.MAP_INPUT_PATH, split.getLength(idx));
149        } catch (Exception e) {
150          throw new RuntimeException (e);
151        }
152        idx++;
153        return true;
154      }
155    }