001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.mapred.lib; 020 021 import java.io.*; 022 import java.lang.reflect.*; 023 024 import org.apache.hadoop.fs.FileSystem; 025 026 import org.apache.hadoop.mapred.*; 027 import org.apache.hadoop.classification.InterfaceAudience; 028 import org.apache.hadoop.classification.InterfaceStability; 029 import org.apache.hadoop.conf.Configuration; 030 031 /** 032 * A generic RecordReader that can hand out different recordReaders 033 * for each chunk in a {@link CombineFileSplit}. 034 * A CombineFileSplit can combine data chunks from multiple files. 035 * This class allows using different RecordReaders for processing 036 * these data chunks from different files. 037 * @see CombineFileSplit 038 */ 039 @InterfaceAudience.Public 040 @InterfaceStability.Stable 041 public class CombineFileRecordReader<K, V> implements RecordReader<K, V> { 042 043 static final Class [] constructorSignature = new Class [] 044 {CombineFileSplit.class, 045 Configuration.class, 046 Reporter.class, 047 Integer.class}; 048 049 protected CombineFileSplit split; 050 protected JobConf jc; 051 protected Reporter reporter; 052 protected Constructor<RecordReader<K, V>> rrConstructor; 053 054 protected int idx; 055 protected long progress; 056 protected RecordReader<K, V> curReader; 057 058 public boolean next(K key, V value) throws IOException { 059 060 while ((curReader == null) || !curReader.next(key, value)) { 061 if (!initNextRecordReader()) { 062 return false; 063 } 064 } 065 return true; 066 } 067 068 public K createKey() { 069 return curReader.createKey(); 070 } 071 072 public V createValue() { 073 return curReader.createValue(); 074 } 075 076 /** 077 * return the amount of data processed 078 */ 079 public long getPos() throws IOException { 080 return progress; 081 } 082 083 public void close() throws IOException { 084 if (curReader != null) { 085 curReader.close(); 086 curReader = null; 087 } 088 } 089 090 /** 091 * return progress based on the amount of data processed so far. 092 */ 093 public float getProgress() throws IOException { 094 return Math.min(1.0f, progress/(float)(split.getLength())); 095 } 096 097 /** 098 * A generic RecordReader that can hand out different recordReaders 099 * for each chunk in the CombineFileSplit. 100 */ 101 public CombineFileRecordReader(JobConf job, CombineFileSplit split, 102 Reporter reporter, 103 Class<RecordReader<K, V>> rrClass) 104 throws IOException { 105 this.split = split; 106 this.jc = job; 107 this.reporter = reporter; 108 this.idx = 0; 109 this.curReader = null; 110 this.progress = 0; 111 112 try { 113 rrConstructor = rrClass.getDeclaredConstructor(constructorSignature); 114 rrConstructor.setAccessible(true); 115 } catch (Exception e) { 116 throw new RuntimeException(rrClass.getName() + 117 " does not have valid constructor", e); 118 } 119 initNextRecordReader(); 120 } 121 122 /** 123 * Get the record reader for the next chunk in this CombineFileSplit. 124 */ 125 protected boolean initNextRecordReader() throws IOException { 126 127 if (curReader != null) { 128 curReader.close(); 129 curReader = null; 130 if (idx > 0) { 131 progress += split.getLength(idx-1); // done processing so far 132 } 133 } 134 135 // if all chunks have been processed, nothing more to do. 136 if (idx == split.getNumPaths()) { 137 return false; 138 } 139 140 // get a record reader for the idx-th chunk 141 try { 142 curReader = rrConstructor.newInstance(new Object [] 143 {split, jc, reporter, Integer.valueOf(idx)}); 144 145 // setup some helper config variables. 146 jc.set(JobContext.MAP_INPUT_FILE, split.getPath(idx).toString()); 147 jc.setLong(JobContext.MAP_INPUT_START, split.getOffset(idx)); 148 jc.setLong(JobContext.MAP_INPUT_PATH, split.getLength(idx)); 149 } catch (Exception e) { 150 throw new RuntimeException (e); 151 } 152 idx++; 153 return true; 154 } 155 }