001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapreduce.lib.input;
020    
021    import java.io.IOException;
022    import java.util.ArrayList;
023    import java.util.Collection;
024    import java.util.LinkedList;
025    import java.util.HashSet;
026    import java.util.List;
027    import java.util.HashMap;
028    import java.util.Set;
029    import java.util.Iterator;
030    import java.util.Map;
031    
032    import org.apache.hadoop.classification.InterfaceAudience;
033    import org.apache.hadoop.classification.InterfaceStability;
034    import org.apache.hadoop.conf.Configuration;
035    import org.apache.hadoop.fs.FileSystem;
036    import org.apache.hadoop.fs.FileUtil;
037    import org.apache.hadoop.fs.Path;
038    import org.apache.hadoop.fs.BlockLocation;
039    import org.apache.hadoop.fs.FileStatus;
040    import org.apache.hadoop.fs.PathFilter;
041    import org.apache.hadoop.io.compress.CompressionCodec;
042    import org.apache.hadoop.io.compress.CompressionCodecFactory;
043    import org.apache.hadoop.io.compress.SplittableCompressionCodec;
044    import org.apache.hadoop.mapreduce.InputFormat;
045    import org.apache.hadoop.mapreduce.InputSplit;
046    import org.apache.hadoop.mapreduce.JobContext;
047    import org.apache.hadoop.mapreduce.RecordReader;
048    import org.apache.hadoop.mapreduce.TaskAttemptContext;
049    import org.apache.hadoop.net.NodeBase;
050    import org.apache.hadoop.net.NetworkTopology;
051    
052    /**
053     * An abstract {@link InputFormat} that returns {@link CombineFileSplit}'s in 
054     * {@link InputFormat#getSplits(JobContext)} method. 
055     * 
056     * Splits are constructed from the files under the input paths. 
057     * A split cannot have files from different pools.
058     * Each split returned may contain blocks from different files.
059     * If a maxSplitSize is specified, then blocks on the same node are
060     * combined to form a single split. Blocks that are left over are
061     * then combined with other blocks in the same rack. 
062     * If maxSplitSize is not specified, then blocks from the same rack
063     * are combined in a single split; no attempt is made to create
064     * node-local splits.
065     * If the maxSplitSize is equal to the block size, then this class
066     * is similar to the default splitting behavior in Hadoop: each
067     * block is a locally processed split.
068     * Subclasses implement 
069     * {@link InputFormat#createRecordReader(InputSplit, TaskAttemptContext)}
070     * to construct <code>RecordReader</code>'s for 
071     * <code>CombineFileSplit</code>'s.
072     * 
073     * @see CombineFileSplit
074     */
075    @InterfaceAudience.Public
076    @InterfaceStability.Stable
077    public abstract class CombineFileInputFormat<K, V>
078      extends FileInputFormat<K, V> {
079    
080      public static final String SPLIT_MINSIZE_PERNODE = 
081        "mapreduce.input.fileinputformat.split.minsize.per.node";
082      public static final String SPLIT_MINSIZE_PERRACK = 
083        "mapreduce.input.fileinputformat.split.minsize.per.rack";
084      // ability to limit the size of a single split
085      private long maxSplitSize = 0;
086      private long minSplitSizeNode = 0;
087      private long minSplitSizeRack = 0;
088    
089      // A pool of input paths filters. A split cannot have blocks from files
090      // across multiple pools.
091      private ArrayList<MultiPathFilter> pools = new  ArrayList<MultiPathFilter>();
092    
093      // mapping from a rack name to the set of Nodes in the rack 
094      private HashMap<String, Set<String>> rackToNodes = 
095                                new HashMap<String, Set<String>>();
096      /**
097       * Specify the maximum size (in bytes) of each split. Each split is
098       * approximately equal to the specified size.
099       */
100      protected void setMaxSplitSize(long maxSplitSize) {
101        this.maxSplitSize = maxSplitSize;
102      }
103    
104      /**
105       * Specify the minimum size (in bytes) of each split per node.
106       * This applies to data that is left over after combining data on a single
107       * node into splits that are of maximum size specified by maxSplitSize.
108       * This leftover data will be combined into its own split if its size
109       * exceeds minSplitSizeNode.
110       */
111      protected void setMinSplitSizeNode(long minSplitSizeNode) {
112        this.minSplitSizeNode = minSplitSizeNode;
113      }
114    
115      /**
116       * Specify the minimum size (in bytes) of each split per rack.
117       * This applies to data that is left over after combining data on a single
118       * rack into splits that are of maximum size specified by maxSplitSize.
119       * This leftover data will be combined into its own split if its size
120       * exceeds minSplitSizeRack.
121       */
122      protected void setMinSplitSizeRack(long minSplitSizeRack) {
123        this.minSplitSizeRack = minSplitSizeRack;
124      }
125    
126      /**
127       * Create a new pool and add the filters to it.
128       * A split cannot have files from different pools.
129       */
130      protected void createPool(List<PathFilter> filters) {
131        pools.add(new MultiPathFilter(filters));
132      }
133    
134      /**
135       * Create a new pool and add the filters to it. 
136       * A pathname can satisfy any one of the specified filters.
137       * A split cannot have files from different pools.
138       */
139      protected void createPool(PathFilter... filters) {
140        MultiPathFilter multi = new MultiPathFilter();
141        for (PathFilter f: filters) {
142          multi.add(f);
143        }
144        pools.add(multi);
145      }
146      
147      @Override
148      protected boolean isSplitable(JobContext context, Path file) {
149        final CompressionCodec codec =
150          new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
151        if (null == codec) {
152          return true;
153        }
154        return codec instanceof SplittableCompressionCodec;
155      }
156    
157      /**
158       * default constructor
159       */
160      public CombineFileInputFormat() {
161      }
162    
163      @Override
164      public List<InputSplit> getSplits(JobContext job) 
165        throws IOException {
166    
167        long minSizeNode = 0;
168        long minSizeRack = 0;
169        long maxSize = 0;
170        Configuration conf = job.getConfiguration();
171    
172        // the values specified by setxxxSplitSize() takes precedence over the
173        // values that might have been specified in the config
174        if (minSplitSizeNode != 0) {
175          minSizeNode = minSplitSizeNode;
176        } else {
177          minSizeNode = conf.getLong(SPLIT_MINSIZE_PERNODE, 0);
178        }
179        if (minSplitSizeRack != 0) {
180          minSizeRack = minSplitSizeRack;
181        } else {
182          minSizeRack = conf.getLong(SPLIT_MINSIZE_PERRACK, 0);
183        }
184        if (maxSplitSize != 0) {
185          maxSize = maxSplitSize;
186        } else {
187          maxSize = conf.getLong("mapreduce.input.fileinputformat.split.maxsize", 0);
188        }
189        if (minSizeNode != 0 && maxSize != 0 && minSizeNode > maxSize) {
190          throw new IOException("Minimum split size pernode " + minSizeNode +
191                                " cannot be larger than maximum split size " +
192                                maxSize);
193        }
194        if (minSizeRack != 0 && maxSize != 0 && minSizeRack > maxSize) {
195          throw new IOException("Minimum split size per rack" + minSizeRack +
196                                " cannot be larger than maximum split size " +
197                                maxSize);
198        }
199        if (minSizeRack != 0 && minSizeNode > minSizeRack) {
200          throw new IOException("Minimum split size per node" + minSizeNode +
201                                " cannot be smaller than minimum split " +
202                                "size per rack " + minSizeRack);
203        }
204    
205        // all the files in input set
206        Path[] paths = FileUtil.stat2Paths(
207                         listStatus(job).toArray(new FileStatus[0]));
208        List<InputSplit> splits = new ArrayList<InputSplit>();
209        if (paths.length == 0) {
210          return splits;    
211        }
212    
213        // Convert them to Paths first. This is a costly operation and 
214        // we should do it first, otherwise we will incur doing it multiple
215        // times, one time each for each pool in the next loop.
216        List<Path> newpaths = new LinkedList<Path>();
217        for (int i = 0; i < paths.length; i++) {
218          FileSystem fs = paths[i].getFileSystem(conf);
219          Path p = fs.makeQualified(paths[i]);
220          newpaths.add(p);
221        }
222    
223        // In one single iteration, process all the paths in a single pool.
224        // Processing one pool at a time ensures that a split contains paths
225        // from a single pool only.
226        for (MultiPathFilter onepool : pools) {
227          ArrayList<Path> myPaths = new ArrayList<Path>();
228          
229          // pick one input path. If it matches all the filters in a pool,
230          // add it to the output set
231          for (Iterator<Path> iter = newpaths.iterator(); iter.hasNext();) {
232            Path p = iter.next();
233            if (onepool.accept(p)) {
234              myPaths.add(p); // add it to my output set
235              iter.remove();
236            }
237          }
238          // create splits for all files in this pool.
239          getMoreSplits(job, myPaths.toArray(new Path[myPaths.size()]), 
240                        maxSize, minSizeNode, minSizeRack, splits);
241        }
242    
243        // create splits for all files that are not in any pool.
244        getMoreSplits(job, newpaths.toArray(new Path[newpaths.size()]), 
245                      maxSize, minSizeNode, minSizeRack, splits);
246    
247        // free up rackToNodes map
248        rackToNodes.clear();
249        return splits;    
250      }
251    
252      /**
253       * Return all the splits in the specified set of paths
254       */
255      private void getMoreSplits(JobContext job, Path[] paths, 
256                                 long maxSize, long minSizeNode, long minSizeRack,
257                                 List<InputSplit> splits)
258        throws IOException {
259        Configuration conf = job.getConfiguration();
260    
261        // all blocks for all the files in input set
262        OneFileInfo[] files;
263      
264        // mapping from a rack name to the list of blocks it has
265        HashMap<String, List<OneBlockInfo>> rackToBlocks = 
266                                  new HashMap<String, List<OneBlockInfo>>();
267    
268        // mapping from a block to the nodes on which it has replicas
269        HashMap<OneBlockInfo, String[]> blockToNodes = 
270                                  new HashMap<OneBlockInfo, String[]>();
271    
272        // mapping from a node to the list of blocks that it contains
273        HashMap<String, List<OneBlockInfo>> nodeToBlocks = 
274                                  new HashMap<String, List<OneBlockInfo>>();
275        
276        files = new OneFileInfo[paths.length];
277        if (paths.length == 0) {
278          return; 
279        }
280    
281        // populate all the blocks for all files
282        long totLength = 0;
283        for (int i = 0; i < paths.length; i++) {
284          files[i] = new OneFileInfo(paths[i], conf, isSplitable(job, paths[i]),
285                                     rackToBlocks, blockToNodes, nodeToBlocks,
286                                     rackToNodes, maxSize);
287          totLength += files[i].getLength();
288        }
289    
290        ArrayList<OneBlockInfo> validBlocks = new ArrayList<OneBlockInfo>();
291        Set<String> nodes = new HashSet<String>();
292        long curSplitSize = 0;
293    
294        // process all nodes and create splits that are local
295        // to a node. 
296        for (Iterator<Map.Entry<String, 
297             List<OneBlockInfo>>> iter = nodeToBlocks.entrySet().iterator(); 
298             iter.hasNext();) {
299    
300          Map.Entry<String, List<OneBlockInfo>> one = iter.next();
301          nodes.add(one.getKey());
302          List<OneBlockInfo> blocksInNode = one.getValue();
303    
304          // for each block, copy it into validBlocks. Delete it from 
305          // blockToNodes so that the same block does not appear in 
306          // two different splits.
307          for (OneBlockInfo oneblock : blocksInNode) {
308            if (blockToNodes.containsKey(oneblock)) {
309              validBlocks.add(oneblock);
310              blockToNodes.remove(oneblock);
311              curSplitSize += oneblock.length;
312    
313              // if the accumulated split size exceeds the maximum, then 
314              // create this split.
315              if (maxSize != 0 && curSplitSize >= maxSize) {
316                // create an input split and add it to the splits array
317                addCreatedSplit(splits, nodes, validBlocks);
318                curSplitSize = 0;
319                validBlocks.clear();
320              }
321            }
322          }
323          // if there were any blocks left over and their combined size is
324          // larger than minSplitNode, then combine them into one split.
325          // Otherwise add them back to the unprocessed pool. It is likely 
326          // that they will be combined with other blocks from the 
327          // same rack later on.
328          if (minSizeNode != 0 && curSplitSize >= minSizeNode) {
329            // create an input split and add it to the splits array
330            addCreatedSplit(splits, nodes, validBlocks);
331          } else {
332            for (OneBlockInfo oneblock : validBlocks) {
333              blockToNodes.put(oneblock, oneblock.hosts);
334            }
335          }
336          validBlocks.clear();
337          nodes.clear();
338          curSplitSize = 0;
339        }
340    
341        // if blocks in a rack are below the specified minimum size, then keep them
342        // in 'overflow'. After the processing of all racks is complete, these 
343        // overflow blocks will be combined into splits.
344        ArrayList<OneBlockInfo> overflowBlocks = new ArrayList<OneBlockInfo>();
345        Set<String> racks = new HashSet<String>();
346    
347        // Process all racks over and over again until there is no more work to do.
348        while (blockToNodes.size() > 0) {
349    
350          // Create one split for this rack before moving over to the next rack. 
351          // Come back to this rack after creating a single split for each of the 
352          // remaining racks.
353          // Process one rack location at a time, Combine all possible blocks that
354          // reside on this rack as one split. (constrained by minimum and maximum
355          // split size).
356    
357          // iterate over all racks 
358          for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter = 
359               rackToBlocks.entrySet().iterator(); iter.hasNext();) {
360    
361            Map.Entry<String, List<OneBlockInfo>> one = iter.next();
362            racks.add(one.getKey());
363            List<OneBlockInfo> blocks = one.getValue();
364    
365            // for each block, copy it into validBlocks. Delete it from 
366            // blockToNodes so that the same block does not appear in 
367            // two different splits.
368            boolean createdSplit = false;
369            for (OneBlockInfo oneblock : blocks) {
370              if (blockToNodes.containsKey(oneblock)) {
371                validBlocks.add(oneblock);
372                blockToNodes.remove(oneblock);
373                curSplitSize += oneblock.length;
374          
375                // if the accumulated split size exceeds the maximum, then 
376                // create this split.
377                if (maxSize != 0 && curSplitSize >= maxSize) {
378                  // create an input split and add it to the splits array
379                  addCreatedSplit(splits, getHosts(racks), validBlocks);
380                  createdSplit = true;
381                  break;
382                }
383              }
384            }
385    
386            // if we created a split, then just go to the next rack
387            if (createdSplit) {
388              curSplitSize = 0;
389              validBlocks.clear();
390              racks.clear();
391              continue;
392            }
393    
394            if (!validBlocks.isEmpty()) {
395              if (minSizeRack != 0 && curSplitSize >= minSizeRack) {
396                // if there is a minimum size specified, then create a single split
397                // otherwise, store these blocks into overflow data structure
398                addCreatedSplit(splits, getHosts(racks), validBlocks);
399              } else {
400                // There were a few blocks in this rack that 
401                    // remained to be processed. Keep them in 'overflow' block list. 
402                    // These will be combined later.
403                overflowBlocks.addAll(validBlocks);
404              }
405            }
406            curSplitSize = 0;
407            validBlocks.clear();
408            racks.clear();
409          }
410        }
411    
412        assert blockToNodes.isEmpty();
413        assert curSplitSize == 0;
414        assert validBlocks.isEmpty();
415        assert racks.isEmpty();
416    
417        // Process all overflow blocks
418        for (OneBlockInfo oneblock : overflowBlocks) {
419          validBlocks.add(oneblock);
420          curSplitSize += oneblock.length;
421    
422          // This might cause an exiting rack location to be re-added,
423          // but it should be ok.
424          for (int i = 0; i < oneblock.racks.length; i++) {
425            racks.add(oneblock.racks[i]);
426          }
427    
428          // if the accumulated split size exceeds the maximum, then 
429          // create this split.
430          if (maxSize != 0 && curSplitSize >= maxSize) {
431            // create an input split and add it to the splits array
432            addCreatedSplit(splits, getHosts(racks), validBlocks);
433            curSplitSize = 0;
434            validBlocks.clear();
435            racks.clear();
436          }
437        }
438    
439        // Process any remaining blocks, if any.
440        if (!validBlocks.isEmpty()) {
441          addCreatedSplit(splits, getHosts(racks), validBlocks);
442        }
443      }
444    
445      /**
446       * Create a single split from the list of blocks specified in validBlocks
447       * Add this new split into splitList.
448       */
449      private void addCreatedSplit(List<InputSplit> splitList, 
450                                   Collection<String> locations, 
451                                   ArrayList<OneBlockInfo> validBlocks) {
452        // create an input split
453        Path[] fl = new Path[validBlocks.size()];
454        long[] offset = new long[validBlocks.size()];
455        long[] length = new long[validBlocks.size()];
456        for (int i = 0; i < validBlocks.size(); i++) {
457          fl[i] = validBlocks.get(i).onepath; 
458          offset[i] = validBlocks.get(i).offset;
459          length[i] = validBlocks.get(i).length;
460        }
461    
462         // add this split to the list that is returned
463        CombineFileSplit thissplit = new CombineFileSplit(fl, offset, 
464                                       length, locations.toArray(new String[0]));
465        splitList.add(thissplit); 
466      }
467    
468      /**
469       * This is not implemented yet. 
470       */
471      public abstract RecordReader<K, V> createRecordReader(InputSplit split,
472          TaskAttemptContext context) throws IOException;
473    
474      /**
475       * information about one file from the File System
476       */
477      private static class OneFileInfo {
478        private long fileSize;               // size of the file
479        private OneBlockInfo[] blocks;       // all blocks in this file
480    
481        OneFileInfo(Path path, Configuration conf,
482                    boolean isSplitable,
483                    HashMap<String, List<OneBlockInfo>> rackToBlocks,
484                    HashMap<OneBlockInfo, String[]> blockToNodes,
485                    HashMap<String, List<OneBlockInfo>> nodeToBlocks,
486                    HashMap<String, Set<String>> rackToNodes,
487                    long maxSize)
488                    throws IOException {
489          this.fileSize = 0;
490    
491          // get block locations from file system
492          FileSystem fs = path.getFileSystem(conf);
493          FileStatus stat = fs.getFileStatus(path);
494          BlockLocation[] locations = fs.getFileBlockLocations(stat, 0, 
495                                                               stat.getLen());
496          // create a list of all block and their locations
497          if (locations == null) {
498            blocks = new OneBlockInfo[0];
499          } else {
500    
501            if(locations.length == 0) {
502              locations = new BlockLocation[] { new BlockLocation() };
503            }
504    
505            if (!isSplitable) {
506              // if the file is not splitable, just create the one block with
507              // full file length
508              blocks = new OneBlockInfo[1];
509              fileSize = stat.getLen();
510              blocks[0] = new OneBlockInfo(path, 0, fileSize, locations[0]
511                  .getHosts(), locations[0].getTopologyPaths());
512            } else {
513              ArrayList<OneBlockInfo> blocksList = new ArrayList<OneBlockInfo>(
514                  locations.length);
515              for (int i = 0; i < locations.length; i++) {
516                fileSize += locations[i].getLength();
517    
518                // each split can be a maximum of maxSize
519                long left = locations[i].getLength();
520                long myOffset = locations[i].getOffset();
521                long myLength = 0;
522                do {
523                  if (maxSize == 0) {
524                    myLength = left;
525                  } else {
526                    if (left > maxSize && left < 2 * maxSize) {
527                      // if remainder is between max and 2*max - then
528                      // instead of creating splits of size max, left-max we
529                      // create splits of size left/2 and left/2. This is
530                      // a heuristic to avoid creating really really small
531                      // splits.
532                      myLength = left / 2;
533                    } else {
534                      myLength = Math.min(maxSize, left);
535                    }
536                  }
537                  OneBlockInfo oneblock = new OneBlockInfo(path, myOffset,
538                      myLength, locations[i].getHosts(), locations[i]
539                          .getTopologyPaths());
540                  left -= myLength;
541                  myOffset += myLength;
542    
543                  blocksList.add(oneblock);
544                } while (left > 0);
545              }
546              blocks = blocksList.toArray(new OneBlockInfo[blocksList.size()]);
547            }
548    
549            for (OneBlockInfo oneblock : blocks) {
550              // add this block to the block --> node locations map
551              blockToNodes.put(oneblock, oneblock.hosts);
552    
553              // For blocks that do not have host/rack information,
554              // assign to default  rack.
555              String[] racks = null;
556              if (oneblock.hosts.length == 0) {
557                racks = new String[]{NetworkTopology.DEFAULT_RACK};
558              } else {
559                racks = oneblock.racks;
560              }
561    
562              // add this block to the rack --> block map
563              for (int j = 0; j < racks.length; j++) {
564                String rack = racks[j];
565                List<OneBlockInfo> blklist = rackToBlocks.get(rack);
566                if (blklist == null) {
567                  blklist = new ArrayList<OneBlockInfo>();
568                  rackToBlocks.put(rack, blklist);
569                }
570                blklist.add(oneblock);
571                if (!racks[j].equals(NetworkTopology.DEFAULT_RACK)) {
572                  // Add this host to rackToNodes map
573                  addHostToRack(rackToNodes, racks[j], oneblock.hosts[j]);
574                }
575              }
576    
577              // add this block to the node --> block map
578              for (int j = 0; j < oneblock.hosts.length; j++) {
579                String node = oneblock.hosts[j];
580                List<OneBlockInfo> blklist = nodeToBlocks.get(node);
581                if (blklist == null) {
582                  blklist = new ArrayList<OneBlockInfo>();
583                  nodeToBlocks.put(node, blklist);
584                }
585                blklist.add(oneblock);
586              }
587            }
588          }
589        }
590    
591        long getLength() {
592          return fileSize;
593        }
594    
595        OneBlockInfo[] getBlocks() {
596          return blocks;
597        }
598      }
599    
600      /**
601       * information about one block from the File System
602       */
603      private static class OneBlockInfo {
604        Path onepath;                // name of this file
605        long offset;                 // offset in file
606        long length;                 // length of this block
607        String[] hosts;              // nodes on which this block resides
608        String[] racks;              // network topology of hosts
609    
610        OneBlockInfo(Path path, long offset, long len, 
611                     String[] hosts, String[] topologyPaths) {
612          this.onepath = path;
613          this.offset = offset;
614          this.hosts = hosts;
615          this.length = len;
616          assert (hosts.length == topologyPaths.length ||
617                  topologyPaths.length == 0);
618    
619          // if the file system does not have any rack information, then
620          // use dummy rack location.
621          if (topologyPaths.length == 0) {
622            topologyPaths = new String[hosts.length];
623            for (int i = 0; i < topologyPaths.length; i++) {
624              topologyPaths[i] = (new NodeBase(hosts[i], 
625                                  NetworkTopology.DEFAULT_RACK)).toString();
626            }
627          }
628    
629          // The topology paths have the host name included as the last 
630          // component. Strip it.
631          this.racks = new String[topologyPaths.length];
632          for (int i = 0; i < topologyPaths.length; i++) {
633            this.racks[i] = (new NodeBase(topologyPaths[i])).getNetworkLocation();
634          }
635        }
636      }
637    
638      protected BlockLocation[] getFileBlockLocations(
639        FileSystem fs, FileStatus stat) throws IOException {
640        return fs.getFileBlockLocations(stat, 0, stat.getLen());
641      }
642    
643      private static void addHostToRack(HashMap<String, Set<String>> rackToNodes,
644                                        String rack, String host) {
645        Set<String> hosts = rackToNodes.get(rack);
646        if (hosts == null) {
647          hosts = new HashSet<String>();
648          rackToNodes.put(rack, hosts);
649        }
650        hosts.add(host);
651      }
652      
653      private Set<String> getHosts(Set<String> racks) {
654        Set<String> hosts = new HashSet<String>();
655        for (String rack : racks) {
656          if (rackToNodes.containsKey(rack)) {
657            hosts.addAll(rackToNodes.get(rack));
658          }
659        }
660        return hosts;
661      }
662      
663      /**
664       * Accept a path only if any one of filters given in the
665       * constructor do. 
666       */
667      private static class MultiPathFilter implements PathFilter {
668        private List<PathFilter> filters;
669    
670        public MultiPathFilter() {
671          this.filters = new ArrayList<PathFilter>();
672        }
673    
674        public MultiPathFilter(List<PathFilter> filters) {
675          this.filters = filters;
676        }
677    
678        public void add(PathFilter one) {
679          filters.add(one);
680        }
681    
682        public boolean accept(Path path) {
683          for (PathFilter filter : filters) {
684            if (filter.accept(path)) {
685              return true;
686            }
687          }
688          return false;
689        }
690    
691        public String toString() {
692          StringBuffer buf = new StringBuffer();
693          buf.append("[");
694          for (PathFilter f: filters) {
695            buf.append(f);
696            buf.append(",");
697          }
698          buf.append("]");
699          return buf.toString();
700        }
701      }
702    }