001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapreduce.lib.input;
020    
021    import java.io.*;
022    import java.lang.reflect.*;
023    
024    import org.apache.hadoop.fs.FileSystem;
025    
026    import org.apache.hadoop.mapreduce.*;
027    import org.apache.hadoop.classification.InterfaceAudience;
028    import org.apache.hadoop.classification.InterfaceStability;
029    import org.apache.hadoop.conf.Configuration;
030    
031    /**
032     * A generic RecordReader that can hand out different recordReaders
033     * for each chunk in a {@link CombineFileSplit}.
034     * A CombineFileSplit can combine data chunks from multiple files. 
035     * This class allows using different RecordReaders for processing
036     * these data chunks from different files.
037     * @see CombineFileSplit
038     */
039    @InterfaceAudience.Public
040    @InterfaceStability.Stable
041    public class CombineFileRecordReader<K, V> extends RecordReader<K, V> {
042    
043      static final Class [] constructorSignature = new Class [] 
044                                             {CombineFileSplit.class,
045                                              TaskAttemptContext.class,
046                                              Integer.class};
047    
048      protected CombineFileSplit split;
049      protected Constructor<? extends RecordReader<K,V>> rrConstructor;
050      protected TaskAttemptContext context;
051      
052      protected int idx;
053      protected long progress;
054      protected RecordReader<K, V> curReader;
055      
056      public void initialize(InputSplit split,
057          TaskAttemptContext context) throws IOException, InterruptedException {
058        this.split = (CombineFileSplit)split;
059        this.context = context;
060        if (null != this.curReader) {
061          this.curReader.initialize(split, context);
062        }
063      }
064      
065      public boolean nextKeyValue() throws IOException, InterruptedException {
066    
067        while ((curReader == null) || !curReader.nextKeyValue()) {
068          if (!initNextRecordReader()) {
069            return false;
070          }
071        }
072        return true;
073      }
074    
075      public K getCurrentKey() throws IOException, InterruptedException {
076        return curReader.getCurrentKey();
077      }
078      
079      public V getCurrentValue() throws IOException, InterruptedException {
080        return curReader.getCurrentValue();
081      }
082      
083      public void close() throws IOException {
084        if (curReader != null) {
085          curReader.close();
086          curReader = null;
087        }
088      }
089      
090      /**
091       * return progress based on the amount of data processed so far.
092       */
093      public float getProgress() throws IOException, InterruptedException {
094        long subprogress = 0;    // bytes processed in current split
095        if (null != curReader) {
096          // idx is always one past the current subsplit's true index.
097          subprogress = (long)(curReader.getProgress() * split.getLength(idx - 1));
098        }
099        return Math.min(1.0f,  (progress + subprogress)/(float)(split.getLength()));
100      }
101      
102      /**
103       * A generic RecordReader that can hand out different recordReaders
104       * for each chunk in the CombineFileSplit.
105       */
106      public CombineFileRecordReader(CombineFileSplit split,
107                                     TaskAttemptContext context,
108                                     Class<? extends RecordReader<K,V>> rrClass)
109        throws IOException {
110        this.split = split;
111        this.context = context;
112        this.idx = 0;
113        this.curReader = null;
114        this.progress = 0;
115    
116        try {
117          rrConstructor = rrClass.getDeclaredConstructor(constructorSignature);
118          rrConstructor.setAccessible(true);
119        } catch (Exception e) {
120          throw new RuntimeException(rrClass.getName() + 
121                                     " does not have valid constructor", e);
122        }
123        initNextRecordReader();
124      }
125      
126      /**
127       * Get the record reader for the next chunk in this CombineFileSplit.
128       */
129      protected boolean initNextRecordReader() throws IOException {
130    
131        if (curReader != null) {
132          curReader.close();
133          curReader = null;
134          if (idx > 0) {
135            progress += split.getLength(idx-1);    // done processing so far
136          }
137        }
138    
139        // if all chunks have been processed, nothing more to do.
140        if (idx == split.getNumPaths()) {
141          return false;
142        }
143    
144        // get a record reader for the idx-th chunk
145        try {
146          Configuration conf = context.getConfiguration();
147          // setup some helper config variables.
148          conf.set(MRJobConfig.MAP_INPUT_FILE, split.getPath(idx).toString());
149          conf.setLong(MRJobConfig.MAP_INPUT_START, split.getOffset(idx));
150          conf.setLong(MRJobConfig.MAP_INPUT_PATH, split.getLength(idx));
151    
152          curReader =  rrConstructor.newInstance(new Object [] 
153                                {split, context, Integer.valueOf(idx)});
154    
155          if (idx > 0) {
156            // initialize() for the first RecordReader will be called by MapTask;
157            // we're responsible for initializing subsequent RecordReaders.
158            curReader.initialize(split, context);
159          }
160        } catch (Exception e) {
161          throw new RuntimeException (e);
162        }
163        idx++;
164        return true;
165      }
166    }