001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.fs;
020    
021    import java.io.*;
022    import java.net.URI;
023    import java.util.*;
024    
025    import org.apache.hadoop.classification.InterfaceAudience;
026    import org.apache.hadoop.classification.InterfaceStability;
027    import org.apache.hadoop.conf.Configuration;
028    
029    /****************************************************************
030     * Implement the FileSystem API for the checksumed local filesystem.
031     *
032     *****************************************************************/
033    @InterfaceAudience.Public
034    @InterfaceStability.Stable
035    public class LocalFileSystem extends ChecksumFileSystem {
036      static final URI NAME = URI.create("file:///");
037      static private Random rand = new Random();
038      
039      public LocalFileSystem() {
040        this(new RawLocalFileSystem());
041      }
042      
043      @Override
044      public void initialize(URI name, Configuration conf) throws IOException {
045        if (fs.getConf() == null) {
046          fs.initialize(name, conf);
047        }
048        String scheme = name.getScheme();
049        if (!scheme.equals(fs.getUri().getScheme())) {
050          swapScheme = scheme;
051        }
052      }
053    
054      /**
055       * Return the protocol scheme for the FileSystem.
056       * <p/>
057       *
058       * @return <code>file</code>
059       */
060      @Override
061      public String getScheme() {
062        return "file";
063      }
064    
065      public FileSystem getRaw() {
066        return getRawFileSystem();
067      }
068        
069      public LocalFileSystem(FileSystem rawLocalFileSystem) {
070        super(rawLocalFileSystem);
071      }
072        
073      /** Convert a path to a File. */
074      public File pathToFile(Path path) {
075        return ((RawLocalFileSystem)fs).pathToFile(path);
076      }
077    
078      @Override
079      public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
080        throws IOException {
081        FileUtil.copy(this, src, this, dst, delSrc, getConf());
082      }
083    
084      @Override
085      public void copyToLocalFile(boolean delSrc, Path src, Path dst)
086        throws IOException {
087        FileUtil.copy(this, src, this, dst, delSrc, getConf());
088      }
089    
090      /**
091       * Moves files to a bad file directory on the same device, so that their
092       * storage will not be reused.
093       */
094      @Override
095      public boolean reportChecksumFailure(Path p, FSDataInputStream in,
096                                           long inPos,
097                                           FSDataInputStream sums, long sumsPos) {
098        try {
099          // canonicalize f
100          File f = ((RawLocalFileSystem)fs).pathToFile(p).getCanonicalFile();
101          
102          // find highest writable parent dir of f on the same device
103          String device = new DF(f, getConf()).getMount();
104          File parent = f.getParentFile();
105          File dir = null;
106          while (parent!=null && parent.canWrite() && parent.toString().startsWith(device)) {
107            dir = parent;
108            parent = parent.getParentFile();
109          }
110    
111          if (dir==null) {
112            throw new IOException(
113                                  "not able to find the highest writable parent dir");
114          }
115            
116          // move the file there
117          File badDir = new File(dir, "bad_files");
118          if (!badDir.mkdirs()) {
119            if (!badDir.isDirectory()) {
120              throw new IOException("Mkdirs failed to create " + badDir.toString());
121            }
122          }
123          String suffix = "." + rand.nextInt();
124          File badFile = new File(badDir, f.getName()+suffix);
125          LOG.warn("Moving bad file " + f + " to " + badFile);
126          in.close();                               // close it first
127          boolean b = f.renameTo(badFile);                      // rename it
128          if (!b) {
129            LOG.warn("Ignoring failure of renameTo");
130          }
131          // move checksum file too
132          File checkFile = ((RawLocalFileSystem)fs).pathToFile(getChecksumFile(p));
133          b = checkFile.renameTo(new File(badDir, checkFile.getName()+suffix));
134          if (!b) {
135              LOG.warn("Ignoring failure of renameTo");
136            }
137        } catch (IOException e) {
138          LOG.warn("Error moving bad file " + p + ": " + e);
139        }
140        return false;
141      }
142    }