001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapred;
020    
021    import java.io.IOException;
022    import java.util.ArrayList;
023    import java.util.Collections;
024    import java.util.Comparator;
025    import java.util.HashSet;
026    import java.util.IdentityHashMap;
027    import java.util.LinkedList;
028    import java.util.List;
029    import java.util.Map;
030    import java.util.Set;
031    
032    import org.apache.commons.logging.Log;
033    import org.apache.commons.logging.LogFactory;
034    import org.apache.hadoop.classification.InterfaceAudience;
035    import org.apache.hadoop.classification.InterfaceStability;
036    import org.apache.hadoop.fs.BlockLocation;
037    import org.apache.hadoop.fs.FileStatus;
038    import org.apache.hadoop.fs.FileSystem;
039    import org.apache.hadoop.fs.Path;
040    import org.apache.hadoop.fs.PathFilter;
041    import org.apache.hadoop.mapreduce.security.TokenCache;
042    import org.apache.hadoop.net.NetworkTopology;
043    import org.apache.hadoop.net.Node;
044    import org.apache.hadoop.net.NodeBase;
045    import org.apache.hadoop.util.ReflectionUtils;
046    import org.apache.hadoop.util.StringUtils;
047    
048    /** 
049     * A base class for file-based {@link InputFormat}.
050     * 
051     * <p><code>FileInputFormat</code> is the base class for all file-based 
052     * <code>InputFormat</code>s. This provides a generic implementation of
053     * {@link #getSplits(JobConf, int)}.
054     * Subclasses of <code>FileInputFormat</code> can also override the 
055     * {@link #isSplitable(FileSystem, Path)} method to ensure input-files are
056     * not split-up and are processed as a whole by {@link Mapper}s.
057     */
058    @InterfaceAudience.Public
059    @InterfaceStability.Stable
060    public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
061    
062      public static final Log LOG =
063        LogFactory.getLog(FileInputFormat.class);
064    
065      public static final String NUM_INPUT_FILES =
066        org.apache.hadoop.mapreduce.lib.input.FileInputFormat.NUM_INPUT_FILES;
067    
068      private static final double SPLIT_SLOP = 1.1;   // 10% slop
069    
070      private long minSplitSize = 1;
071      private static final PathFilter hiddenFileFilter = new PathFilter(){
072          public boolean accept(Path p){
073            String name = p.getName(); 
074            return !name.startsWith("_") && !name.startsWith("."); 
075          }
076        }; 
077      protected void setMinSplitSize(long minSplitSize) {
078        this.minSplitSize = minSplitSize;
079      }
080    
081      /**
082       * Proxy PathFilter that accepts a path only if all filters given in the
083       * constructor do. Used by the listPaths() to apply the built-in
084       * hiddenFileFilter together with a user provided one (if any).
085       */
086      private static class MultiPathFilter implements PathFilter {
087        private List<PathFilter> filters;
088    
089        public MultiPathFilter(List<PathFilter> filters) {
090          this.filters = filters;
091        }
092    
093        public boolean accept(Path path) {
094          for (PathFilter filter : filters) {
095            if (!filter.accept(path)) {
096              return false;
097            }
098          }
099          return true;
100        }
101      }
102    
103      /**
104       * Is the given filename splitable? Usually, true, but if the file is
105       * stream compressed, it will not be.
106       * 
107       * <code>FileInputFormat</code> implementations can override this and return
108       * <code>false</code> to ensure that individual input files are never split-up
109       * so that {@link Mapper}s process entire files.
110       * 
111       * @param fs the file system that the file is on
112       * @param filename the file name to check
113       * @return is this file splitable?
114       */
115      protected boolean isSplitable(FileSystem fs, Path filename) {
116        return true;
117      }
118      
119      public abstract RecordReader<K, V> getRecordReader(InputSplit split,
120                                                   JobConf job,
121                                                   Reporter reporter)
122        throws IOException;
123    
124      /**
125       * Set a PathFilter to be applied to the input paths for the map-reduce job.
126       *
127       * @param filter the PathFilter class use for filtering the input paths.
128       */
129      public static void setInputPathFilter(JobConf conf,
130                                            Class<? extends PathFilter> filter) {
131        conf.setClass(org.apache.hadoop.mapreduce.lib.input.
132          FileInputFormat.PATHFILTER_CLASS, filter, PathFilter.class);
133      }
134    
135      /**
136       * Get a PathFilter instance of the filter set for the input paths.
137       *
138       * @return the PathFilter instance set for the job, NULL if none has been set.
139       */
140      public static PathFilter getInputPathFilter(JobConf conf) {
141        Class<? extends PathFilter> filterClass = conf.getClass(
142              org.apache.hadoop.mapreduce.lib.input.FileInputFormat.PATHFILTER_CLASS,
143              null, PathFilter.class);
144        return (filterClass != null) ?
145            ReflectionUtils.newInstance(filterClass, conf) : null;
146      }
147    
148      /**
149       * Add files in the input path recursively into the results.
150       * @param result
151       *          The List to store all files.
152       * @param fs
153       *          The FileSystem.
154       * @param path
155       *          The input path.
156       * @param inputFilter
157       *          The input filter that can be used to filter files/dirs. 
158       * @throws IOException
159       */
160      protected void addInputPathRecursively(List<FileStatus> result,
161          FileSystem fs, Path path, PathFilter inputFilter) 
162          throws IOException {
163        for(FileStatus stat: fs.listStatus(path, inputFilter)) {
164          if (stat.isDirectory()) {
165            addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
166          } else {
167            result.add(stat);
168          }
169        }          
170      }
171      
172      /** List input directories.
173       * Subclasses may override to, e.g., select only files matching a regular
174       * expression. 
175       * 
176       * @param job the job to list input paths for
177       * @return array of FileStatus objects
178       * @throws IOException if zero items.
179       */
180      protected FileStatus[] listStatus(JobConf job) throws IOException {
181        Path[] dirs = getInputPaths(job);
182        if (dirs.length == 0) {
183          throw new IOException("No input paths specified in job");
184        }
185    
186        // get tokens for all the required FileSystems..
187        TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, job);
188        
189        // Whether we need to recursive look into the directory structure
190        boolean recursive = job.getBoolean("mapred.input.dir.recursive", false);
191        
192        List<FileStatus> result = new ArrayList<FileStatus>();
193        List<IOException> errors = new ArrayList<IOException>();
194        
195        // creates a MultiPathFilter with the hiddenFileFilter and the
196        // user provided one (if any).
197        List<PathFilter> filters = new ArrayList<PathFilter>();
198        filters.add(hiddenFileFilter);
199        PathFilter jobFilter = getInputPathFilter(job);
200        if (jobFilter != null) {
201          filters.add(jobFilter);
202        }
203        PathFilter inputFilter = new MultiPathFilter(filters);
204    
205        for (Path p: dirs) {
206          FileSystem fs = p.getFileSystem(job); 
207          FileStatus[] matches = fs.globStatus(p, inputFilter);
208          if (matches == null) {
209            errors.add(new IOException("Input path does not exist: " + p));
210          } else if (matches.length == 0) {
211            errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
212          } else {
213            for (FileStatus globStat: matches) {
214              if (globStat.isDirectory()) {
215                for(FileStatus stat: fs.listStatus(globStat.getPath(),
216                    inputFilter)) {
217                  if (recursive && stat.isDirectory()) {
218                    addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
219                  } else {
220                    result.add(stat);
221                  }
222                }          
223              } else {
224                result.add(globStat);
225              }
226            }
227          }
228        }
229    
230        if (!errors.isEmpty()) {
231          throw new InvalidInputException(errors);
232        }
233        LOG.info("Total input paths to process : " + result.size()); 
234        return result.toArray(new FileStatus[result.size()]);
235      }
236    
237      /**
238       * A factory that makes the split for this class. It can be overridden
239       * by sub-classes to make sub-types
240       */
241      protected FileSplit makeSplit(Path file, long start, long length, 
242                                    String[] hosts) {
243        return new FileSplit(file, start, length, hosts);
244      }
245    
246      /** Splits files returned by {@link #listStatus(JobConf)} when
247       * they're too big.*/ 
248      @SuppressWarnings("deprecation")
249      public InputSplit[] getSplits(JobConf job, int numSplits)
250        throws IOException {
251        FileStatus[] files = listStatus(job);
252        
253        // Save the number of input files for metrics/loadgen
254        job.setLong(NUM_INPUT_FILES, files.length);
255        long totalSize = 0;                           // compute total size
256        for (FileStatus file: files) {                // check we have valid files
257          if (file.isDirectory()) {
258            throw new IOException("Not a file: "+ file.getPath());
259          }
260          totalSize += file.getLen();
261        }
262    
263        long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
264        long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
265          FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);
266    
267        // generate splits
268        ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
269        NetworkTopology clusterMap = new NetworkTopology();
270        for (FileStatus file: files) {
271          Path path = file.getPath();
272          FileSystem fs = path.getFileSystem(job);
273          long length = file.getLen();
274          BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
275          if ((length != 0) && isSplitable(fs, path)) { 
276            long blockSize = file.getBlockSize();
277            long splitSize = computeSplitSize(goalSize, minSize, blockSize);
278    
279            long bytesRemaining = length;
280            while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
281              String[] splitHosts = getSplitHosts(blkLocations, 
282                  length-bytesRemaining, splitSize, clusterMap);
283              splits.add(makeSplit(path, length-bytesRemaining, splitSize, 
284                                   splitHosts));
285              bytesRemaining -= splitSize;
286            }
287            
288            if (bytesRemaining != 0) {
289              String[] splitHosts = getSplitHosts(blkLocations, length
290                  - bytesRemaining, bytesRemaining, clusterMap);
291              splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
292                  splitHosts));
293            }
294          } else if (length != 0) {
295            String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);
296            splits.add(makeSplit(path, 0, length, splitHosts));
297          } else { 
298            //Create empty hosts array for zero length files
299            splits.add(makeSplit(path, 0, length, new String[0]));
300          }
301        }
302        LOG.debug("Total # of splits: " + splits.size());
303        return splits.toArray(new FileSplit[splits.size()]);
304      }
305    
306      protected long computeSplitSize(long goalSize, long minSize,
307                                           long blockSize) {
308        return Math.max(minSize, Math.min(goalSize, blockSize));
309      }
310    
311      protected int getBlockIndex(BlockLocation[] blkLocations, 
312                                  long offset) {
313        for (int i = 0 ; i < blkLocations.length; i++) {
314          // is the offset inside this block?
315          if ((blkLocations[i].getOffset() <= offset) &&
316              (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){
317            return i;
318          }
319        }
320        BlockLocation last = blkLocations[blkLocations.length -1];
321        long fileLength = last.getOffset() + last.getLength() -1;
322        throw new IllegalArgumentException("Offset " + offset + 
323                                           " is outside of file (0.." +
324                                           fileLength + ")");
325      }
326    
327      /**
328       * Sets the given comma separated paths as the list of inputs 
329       * for the map-reduce job.
330       * 
331       * @param conf Configuration of the job
332       * @param commaSeparatedPaths Comma separated paths to be set as 
333       *        the list of inputs for the map-reduce job.
334       */
335      public static void setInputPaths(JobConf conf, String commaSeparatedPaths) {
336        setInputPaths(conf, StringUtils.stringToPath(
337                            getPathStrings(commaSeparatedPaths)));
338      }
339    
340      /**
341       * Add the given comma separated paths to the list of inputs for
342       *  the map-reduce job.
343       * 
344       * @param conf The configuration of the job 
345       * @param commaSeparatedPaths Comma separated paths to be added to
346       *        the list of inputs for the map-reduce job.
347       */
348      public static void addInputPaths(JobConf conf, String commaSeparatedPaths) {
349        for (String str : getPathStrings(commaSeparatedPaths)) {
350          addInputPath(conf, new Path(str));
351        }
352      }
353    
354      /**
355       * Set the array of {@link Path}s as the list of inputs
356       * for the map-reduce job.
357       * 
358       * @param conf Configuration of the job. 
359       * @param inputPaths the {@link Path}s of the input directories/files 
360       * for the map-reduce job.
361       */ 
362      public static void setInputPaths(JobConf conf, Path... inputPaths) {
363        Path path = new Path(conf.getWorkingDirectory(), inputPaths[0]);
364        StringBuffer str = new StringBuffer(StringUtils.escapeString(path.toString()));
365        for(int i = 1; i < inputPaths.length;i++) {
366          str.append(StringUtils.COMMA_STR);
367          path = new Path(conf.getWorkingDirectory(), inputPaths[i]);
368          str.append(StringUtils.escapeString(path.toString()));
369        }
370        conf.set(org.apache.hadoop.mapreduce.lib.input.
371          FileInputFormat.INPUT_DIR, str.toString());
372      }
373    
374      /**
375       * Add a {@link Path} to the list of inputs for the map-reduce job.
376       * 
377       * @param conf The configuration of the job 
378       * @param path {@link Path} to be added to the list of inputs for 
379       *            the map-reduce job.
380       */
381      public static void addInputPath(JobConf conf, Path path ) {
382        path = new Path(conf.getWorkingDirectory(), path);
383        String dirStr = StringUtils.escapeString(path.toString());
384        String dirs = conf.get(org.apache.hadoop.mapreduce.lib.input.
385          FileInputFormat.INPUT_DIR);
386        conf.set(org.apache.hadoop.mapreduce.lib.input.
387          FileInputFormat.INPUT_DIR, dirs == null ? dirStr :
388          dirs + StringUtils.COMMA_STR + dirStr);
389      }
390             
391      // This method escapes commas in the glob pattern of the given paths.
392      private static String[] getPathStrings(String commaSeparatedPaths) {
393        int length = commaSeparatedPaths.length();
394        int curlyOpen = 0;
395        int pathStart = 0;
396        boolean globPattern = false;
397        List<String> pathStrings = new ArrayList<String>();
398        
399        for (int i=0; i<length; i++) {
400          char ch = commaSeparatedPaths.charAt(i);
401          switch(ch) {
402            case '{' : {
403              curlyOpen++;
404              if (!globPattern) {
405                globPattern = true;
406              }
407              break;
408            }
409            case '}' : {
410              curlyOpen--;
411              if (curlyOpen == 0 && globPattern) {
412                globPattern = false;
413              }
414              break;
415            }
416            case ',' : {
417              if (!globPattern) {
418                pathStrings.add(commaSeparatedPaths.substring(pathStart, i));
419                pathStart = i + 1 ;
420              }
421              break;
422            }
423            default:
424              continue; // nothing special to do for this character
425          }
426        }
427        pathStrings.add(commaSeparatedPaths.substring(pathStart, length));
428        
429        return pathStrings.toArray(new String[0]);
430      }
431      
432      /**
433       * Get the list of input {@link Path}s for the map-reduce job.
434       * 
435       * @param conf The configuration of the job 
436       * @return the list of input {@link Path}s for the map-reduce job.
437       */
438      public static Path[] getInputPaths(JobConf conf) {
439        String dirs = conf.get(org.apache.hadoop.mapreduce.lib.input.
440          FileInputFormat.INPUT_DIR, "");
441        String [] list = StringUtils.split(dirs);
442        Path[] result = new Path[list.length];
443        for (int i = 0; i < list.length; i++) {
444          result[i] = new Path(StringUtils.unEscapeString(list[i]));
445        }
446        return result;
447      }
448      
449    
450      private void sortInDescendingOrder(List<NodeInfo> mylist) {
451        Collections.sort(mylist, new Comparator<NodeInfo> () {
452          public int compare(NodeInfo obj1, NodeInfo obj2) {
453    
454            if (obj1 == null || obj2 == null)
455              return -1;
456    
457            if (obj1.getValue() == obj2.getValue()) {
458              return 0;
459            }
460            else {
461              return ((obj1.getValue() < obj2.getValue()) ? 1 : -1);
462            }
463          }
464        }
465        );
466      }
467    
468      /** 
469       * This function identifies and returns the hosts that contribute 
470       * most for a given split. For calculating the contribution, rack
471       * locality is treated on par with host locality, so hosts from racks
472       * that contribute the most are preferred over hosts on racks that 
473       * contribute less
474       * @param blkLocations The list of block locations
475       * @param offset 
476       * @param splitSize 
477       * @return array of hosts that contribute most to this split
478       * @throws IOException
479       */
480      protected String[] getSplitHosts(BlockLocation[] blkLocations, 
481          long offset, long splitSize, NetworkTopology clusterMap)
482      throws IOException {
483    
484        int startIndex = getBlockIndex(blkLocations, offset);
485    
486        long bytesInThisBlock = blkLocations[startIndex].getOffset() + 
487                              blkLocations[startIndex].getLength() - offset;
488    
489        //If this is the only block, just return
490        if (bytesInThisBlock >= splitSize) {
491          return blkLocations[startIndex].getHosts();
492        }
493    
494        long bytesInFirstBlock = bytesInThisBlock;
495        int index = startIndex + 1;
496        splitSize -= bytesInThisBlock;
497    
498        while (splitSize > 0) {
499          bytesInThisBlock =
500            Math.min(splitSize, blkLocations[index++].getLength());
501          splitSize -= bytesInThisBlock;
502        }
503    
504        long bytesInLastBlock = bytesInThisBlock;
505        int endIndex = index - 1;
506        
507        Map <Node,NodeInfo> hostsMap = new IdentityHashMap<Node,NodeInfo>();
508        Map <Node,NodeInfo> racksMap = new IdentityHashMap<Node,NodeInfo>();
509        String [] allTopos = new String[0];
510    
511        // Build the hierarchy and aggregate the contribution of 
512        // bytes at each level. See TestGetSplitHosts.java 
513    
514        for (index = startIndex; index <= endIndex; index++) {
515    
516          // Establish the bytes in this block
517          if (index == startIndex) {
518            bytesInThisBlock = bytesInFirstBlock;
519          }
520          else if (index == endIndex) {
521            bytesInThisBlock = bytesInLastBlock;
522          }
523          else {
524            bytesInThisBlock = blkLocations[index].getLength();
525          }
526          
527          allTopos = blkLocations[index].getTopologyPaths();
528    
529          // If no topology information is available, just
530          // prefix a fakeRack
531          if (allTopos.length == 0) {
532            allTopos = fakeRacks(blkLocations, index);
533          }
534    
535          // NOTE: This code currently works only for one level of
536          // hierarchy (rack/host). However, it is relatively easy
537          // to extend this to support aggregation at different
538          // levels 
539          
540          for (String topo: allTopos) {
541    
542            Node node, parentNode;
543            NodeInfo nodeInfo, parentNodeInfo;
544    
545            node = clusterMap.getNode(topo);
546    
547            if (node == null) {
548              node = new NodeBase(topo);
549              clusterMap.add(node);
550            }
551            
552            nodeInfo = hostsMap.get(node);
553            
554            if (nodeInfo == null) {
555              nodeInfo = new NodeInfo(node);
556              hostsMap.put(node,nodeInfo);
557              parentNode = node.getParent();
558              parentNodeInfo = racksMap.get(parentNode);
559              if (parentNodeInfo == null) {
560                parentNodeInfo = new NodeInfo(parentNode);
561                racksMap.put(parentNode,parentNodeInfo);
562              }
563              parentNodeInfo.addLeaf(nodeInfo);
564            }
565            else {
566              nodeInfo = hostsMap.get(node);
567              parentNode = node.getParent();
568              parentNodeInfo = racksMap.get(parentNode);
569            }
570    
571            nodeInfo.addValue(index, bytesInThisBlock);
572            parentNodeInfo.addValue(index, bytesInThisBlock);
573    
574          } // for all topos
575        
576        } // for all indices
577    
578        return identifyHosts(allTopos.length, racksMap);
579      }
580      
581      private String[] identifyHosts(int replicationFactor, 
582                                     Map<Node,NodeInfo> racksMap) {
583        
584        String [] retVal = new String[replicationFactor];
585       
586        List <NodeInfo> rackList = new LinkedList<NodeInfo>(); 
587    
588        rackList.addAll(racksMap.values());
589        
590        // Sort the racks based on their contribution to this split
591        sortInDescendingOrder(rackList);
592        
593        boolean done = false;
594        int index = 0;
595        
596        // Get the host list for all our aggregated items, sort
597        // them and return the top entries
598        for (NodeInfo ni: rackList) {
599    
600          Set<NodeInfo> hostSet = ni.getLeaves();
601    
602          List<NodeInfo>hostList = new LinkedList<NodeInfo>();
603          hostList.addAll(hostSet);
604        
605          // Sort the hosts in this rack based on their contribution
606          sortInDescendingOrder(hostList);
607    
608          for (NodeInfo host: hostList) {
609            // Strip out the port number from the host name
610            retVal[index++] = host.node.getName().split(":")[0];
611            if (index == replicationFactor) {
612              done = true;
613              break;
614            }
615          }
616          
617          if (done == true) {
618            break;
619          }
620        }
621        return retVal;
622      }
623      
624      private String[] fakeRacks(BlockLocation[] blkLocations, int index) 
625      throws IOException {
626        String[] allHosts = blkLocations[index].getHosts();
627        String[] allTopos = new String[allHosts.length];
628        for (int i = 0; i < allHosts.length; i++) {
629          allTopos[i] = NetworkTopology.DEFAULT_RACK + "/" + allHosts[i];
630        }
631        return allTopos;
632      }
633    
634    
635      private static class NodeInfo {
636        final Node node;
637        final Set<Integer> blockIds;
638        final Set<NodeInfo> leaves;
639    
640        private long value;
641        
642        NodeInfo(Node node) {
643          this.node = node;
644          blockIds = new HashSet<Integer>();
645          leaves = new HashSet<NodeInfo>();
646        }
647    
648        long getValue() {return value;}
649    
650        void addValue(int blockIndex, long value) {
651          if (blockIds.add(blockIndex) == true) {
652            this.value += value;
653          }
654        }
655    
656        Set<NodeInfo> getLeaves() { return leaves;}
657    
658        void addLeaf(NodeInfo nodeInfo) {
659          leaves.add(nodeInfo);
660        }
661      }
662    }