001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.fs;
020    
021    import java.io.BufferedOutputStream;
022    import java.io.DataOutput;
023    import java.io.File;
024    import java.io.FileInputStream;
025    import java.io.FileNotFoundException;
026    import java.io.FileOutputStream;
027    import java.io.IOException;
028    import java.io.OutputStream;
029    import java.io.FileDescriptor;
030    import java.net.URI;
031    import java.nio.ByteBuffer;
032    import java.util.Arrays;
033    import java.util.EnumSet;
034    import java.util.StringTokenizer;
035    
036    import org.apache.hadoop.classification.InterfaceAudience;
037    import org.apache.hadoop.classification.InterfaceStability;
038    import org.apache.hadoop.conf.Configuration;
039    import org.apache.hadoop.fs.permission.FsPermission;
040    import org.apache.hadoop.io.nativeio.NativeIO;
041    import org.apache.hadoop.util.Progressable;
042    import org.apache.hadoop.util.Shell;
043    import org.apache.hadoop.util.StringUtils;
044    
045    /****************************************************************
046     * Implement the FileSystem API for the raw local filesystem.
047     *
048     *****************************************************************/
049    @InterfaceAudience.Public
050    @InterfaceStability.Stable
051    public class RawLocalFileSystem extends FileSystem {
052      static final URI NAME = URI.create("file:///");
053      private Path workingDir;
054      
055      public RawLocalFileSystem() {
056        workingDir = getInitialWorkingDirectory();
057      }
058      
059      private Path makeAbsolute(Path f) {
060        if (f.isAbsolute()) {
061          return f;
062        } else {
063          return new Path(workingDir, f);
064        }
065      }
066      
067      /** Convert a path to a File. */
068      public File pathToFile(Path path) {
069        checkPath(path);
070        if (!path.isAbsolute()) {
071          path = new Path(getWorkingDirectory(), path);
072        }
073        return new File(path.toUri().getPath());
074      }
075    
076      @Override
077      public URI getUri() { return NAME; }
078      
079      @Override
080      public void initialize(URI uri, Configuration conf) throws IOException {
081        super.initialize(uri, conf);
082        setConf(conf);
083      }
084      
085      class TrackingFileInputStream extends FileInputStream {
086        public TrackingFileInputStream(File f) throws IOException {
087          super(f);
088        }
089        
090        @Override
091        public int read() throws IOException {
092          int result = super.read();
093          if (result != -1) {
094            statistics.incrementBytesRead(1);
095          }
096          return result;
097        }
098        
099        @Override
100        public int read(byte[] data) throws IOException {
101          int result = super.read(data);
102          if (result != -1) {
103            statistics.incrementBytesRead(result);
104          }
105          return result;
106        }
107        
108        @Override
109        public int read(byte[] data, int offset, int length) throws IOException {
110          int result = super.read(data, offset, length);
111          if (result != -1) {
112            statistics.incrementBytesRead(result);
113          }
114          return result;
115        }
116      }
117    
118      /*******************************************************
119       * For open()'s FSInputStream.
120       *******************************************************/
121      class LocalFSFileInputStream extends FSInputStream implements HasFileDescriptor {
122        private FileInputStream fis;
123        private long position;
124    
125        public LocalFSFileInputStream(Path f) throws IOException {
126          this.fis = new TrackingFileInputStream(pathToFile(f));
127        }
128        
129        @Override
130        public void seek(long pos) throws IOException {
131          fis.getChannel().position(pos);
132          this.position = pos;
133        }
134        
135        @Override
136        public long getPos() throws IOException {
137          return this.position;
138        }
139        
140        @Override
141        public boolean seekToNewSource(long targetPos) throws IOException {
142          return false;
143        }
144        
145        /*
146         * Just forward to the fis
147         */
148        @Override
149        public int available() throws IOException { return fis.available(); }
150        @Override
151        public void close() throws IOException { fis.close(); }
152        @Override
153        public boolean markSupported() { return false; }
154        
155        @Override
156        public int read() throws IOException {
157          try {
158            int value = fis.read();
159            if (value >= 0) {
160              this.position++;
161            }
162            return value;
163          } catch (IOException e) {                 // unexpected exception
164            throw new FSError(e);                   // assume native fs error
165          }
166        }
167        
168        @Override
169        public int read(byte[] b, int off, int len) throws IOException {
170          try {
171            int value = fis.read(b, off, len);
172            if (value > 0) {
173              this.position += value;
174            }
175            return value;
176          } catch (IOException e) {                 // unexpected exception
177            throw new FSError(e);                   // assume native fs error
178          }
179        }
180        
181        @Override
182        public int read(long position, byte[] b, int off, int len)
183          throws IOException {
184          ByteBuffer bb = ByteBuffer.wrap(b, off, len);
185          try {
186            return fis.getChannel().read(bb, position);
187          } catch (IOException e) {
188            throw new FSError(e);
189          }
190        }
191        
192        @Override
193        public long skip(long n) throws IOException {
194          long value = fis.skip(n);
195          if (value > 0) {
196            this.position += value;
197          }
198          return value;
199        }
200    
201        @Override
202        public FileDescriptor getFileDescriptor() throws IOException {
203          return fis.getFD();
204        }
205      }
206      
207      @Override
208      public FSDataInputStream open(Path f, int bufferSize) throws IOException {
209        if (!exists(f)) {
210          throw new FileNotFoundException(f.toString());
211        }
212        return new FSDataInputStream(new BufferedFSInputStream(
213            new LocalFSFileInputStream(f), bufferSize));
214      }
215      
216      /*********************************************************
217       * For create()'s FSOutputStream.
218       *********************************************************/
219      class LocalFSFileOutputStream extends OutputStream {
220        private FileOutputStream fos;
221        
222        private LocalFSFileOutputStream(Path f, boolean append) throws IOException {
223          this.fos = new FileOutputStream(pathToFile(f), append);
224        }
225        
226        /*
227         * Just forward to the fos
228         */
229        @Override
230        public void close() throws IOException { fos.close(); }
231        @Override
232        public void flush() throws IOException { fos.flush(); }
233        @Override
234        public void write(byte[] b, int off, int len) throws IOException {
235          try {
236            fos.write(b, off, len);
237          } catch (IOException e) {                // unexpected exception
238            throw new FSError(e);                  // assume native fs error
239          }
240        }
241        
242        @Override
243        public void write(int b) throws IOException {
244          try {
245            fos.write(b);
246          } catch (IOException e) {              // unexpected exception
247            throw new FSError(e);                // assume native fs error
248          }
249        }
250      }
251    
252      @Override
253      public FSDataOutputStream append(Path f, int bufferSize,
254          Progressable progress) throws IOException {
255        if (!exists(f)) {
256          throw new FileNotFoundException("File " + f + " not found");
257        }
258        if (getFileStatus(f).isDirectory()) {
259          throw new IOException("Cannot append to a diretory (=" + f + " )");
260        }
261        return new FSDataOutputStream(new BufferedOutputStream(
262            new LocalFSFileOutputStream(f, true), bufferSize), statistics);
263      }
264    
265      @Override
266      public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize,
267        short replication, long blockSize, Progressable progress)
268        throws IOException {
269        return create(f, overwrite, true, bufferSize, replication, blockSize, progress);
270      }
271    
272      private FSDataOutputStream create(Path f, boolean overwrite,
273          boolean createParent, int bufferSize, short replication, long blockSize,
274          Progressable progress) throws IOException {
275        if (exists(f) && !overwrite) {
276          throw new IOException("File already exists: "+f);
277        }
278        Path parent = f.getParent();
279        if (parent != null && !mkdirs(parent)) {
280          throw new IOException("Mkdirs failed to create " + parent.toString());
281        }
282        return new FSDataOutputStream(new BufferedOutputStream(
283            new LocalFSFileOutputStream(f, false), bufferSize), statistics);
284      }
285      
286      @Override
287      @Deprecated
288      public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
289          EnumSet<CreateFlag> flags, int bufferSize, short replication, long blockSize,
290          Progressable progress) throws IOException {
291        if (exists(f) && !flags.contains(CreateFlag.OVERWRITE)) {
292          throw new IOException("File already exists: "+f);
293        }
294        return new FSDataOutputStream(new BufferedOutputStream(
295            new LocalFSFileOutputStream(f, false), bufferSize), statistics);
296      }
297    
298      @Override
299      public FSDataOutputStream create(Path f, FsPermission permission,
300        boolean overwrite, int bufferSize, short replication, long blockSize,
301        Progressable progress) throws IOException {
302    
303        FSDataOutputStream out = create(f,
304            overwrite, bufferSize, replication, blockSize, progress);
305        setPermission(f, permission);
306        return out;
307      }
308    
309      @Override
310      public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
311          boolean overwrite,
312          int bufferSize, short replication, long blockSize,
313          Progressable progress) throws IOException {
314        FSDataOutputStream out = create(f,
315            overwrite, false, bufferSize, replication, blockSize, progress);
316        setPermission(f, permission);
317        return out;
318      }
319    
320      @Override
321      public boolean rename(Path src, Path dst) throws IOException {
322        if (pathToFile(src).renameTo(pathToFile(dst))) {
323          return true;
324        }
325        return FileUtil.copy(this, src, this, dst, true, getConf());
326      }
327      
328      /**
329       * Delete the given path to a file or directory.
330       * @param p the path to delete
331       * @param recursive to delete sub-directories
332       * @return true if the file or directory and all its contents were deleted
333       * @throws IOException if p is non-empty and recursive is false 
334       */
335      @Override
336      public boolean delete(Path p, boolean recursive) throws IOException {
337        File f = pathToFile(p);
338        if (f.isFile()) {
339          return f.delete();
340        } else if (!recursive && f.isDirectory() && 
341            (FileUtil.listFiles(f).length != 0)) {
342          throw new IOException("Directory " + f.toString() + " is not empty");
343        }
344        return FileUtil.fullyDelete(f);
345      }
346     
347      @Override
348      public FileStatus[] listStatus(Path f) throws IOException {
349        File localf = pathToFile(f);
350        FileStatus[] results;
351    
352        if (!localf.exists()) {
353          throw new FileNotFoundException("File " + f + " does not exist");
354        }
355        if (localf.isFile()) {
356          return new FileStatus[] {
357            new RawLocalFileStatus(localf, getDefaultBlockSize(f), this) };
358        }
359    
360        File[] names = localf.listFiles();
361        if (names == null) {
362          return null;
363        }
364        results = new FileStatus[names.length];
365        int j = 0;
366        for (int i = 0; i < names.length; i++) {
367          try {
368            results[j] = getFileStatus(new Path(names[i].getAbsolutePath()));
369            j++;
370          } catch (FileNotFoundException e) {
371            // ignore the files not found since the dir list may have have changed
372            // since the names[] list was generated.
373          }
374        }
375        if (j == names.length) {
376          return results;
377        }
378        return Arrays.copyOf(results, j);
379      }
380    
381      /**
382       * Creates the specified directory hierarchy. Does not
383       * treat existence as an error.
384       */
385      @Override
386      public boolean mkdirs(Path f) throws IOException {
387        if(f == null) {
388          throw new IllegalArgumentException("mkdirs path arg is null");
389        }
390        Path parent = f.getParent();
391        File p2f = pathToFile(f);
392        if(parent != null) {
393          File parent2f = pathToFile(parent);
394          if(parent2f != null && parent2f.exists() && !parent2f.isDirectory()) {
395            throw new FileAlreadyExistsException("Parent path is not a directory: " 
396                + parent);
397          }
398        }
399        return (parent == null || mkdirs(parent)) &&
400          (p2f.mkdir() || p2f.isDirectory());
401      }
402    
403      @Override
404      public boolean mkdirs(Path f, FsPermission permission) throws IOException {
405        boolean b = mkdirs(f);
406        if(b) {
407          setPermission(f, permission);
408        }
409        return b;
410      }
411      
412    
413      @Override
414      protected boolean primitiveMkdir(Path f, FsPermission absolutePermission)
415        throws IOException {
416        boolean b = mkdirs(f);
417        setPermission(f, absolutePermission);
418        return b;
419      }
420      
421      
422      @Override
423      public Path getHomeDirectory() {
424        return this.makeQualified(new Path(System.getProperty("user.home")));
425      }
426    
427      /**
428       * Set the working directory to the given directory.
429       */
430      @Override
431      public void setWorkingDirectory(Path newDir) {
432        workingDir = makeAbsolute(newDir);
433        checkPath(workingDir);
434        
435      }
436      
437      @Override
438      public Path getWorkingDirectory() {
439        return workingDir;
440      }
441      
442      @Override
443      protected Path getInitialWorkingDirectory() {
444        return this.makeQualified(new Path(System.getProperty("user.dir")));
445      }
446    
447      @Override
448      public FsStatus getStatus(Path p) throws IOException {
449        File partition = pathToFile(p == null ? new Path("/") : p);
450        //File provides getUsableSpace() and getFreeSpace()
451        //File provides no API to obtain used space, assume used = total - free
452        return new FsStatus(partition.getTotalSpace(), 
453          partition.getTotalSpace() - partition.getFreeSpace(),
454          partition.getFreeSpace());
455      }
456      
457      // In the case of the local filesystem, we can just rename the file.
458      @Override
459      public void moveFromLocalFile(Path src, Path dst) throws IOException {
460        rename(src, dst);
461      }
462      
463      // We can write output directly to the final location
464      @Override
465      public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
466        throws IOException {
467        return fsOutputFile;
468      }
469      
470      // It's in the right place - nothing to do.
471      @Override
472      public void completeLocalOutput(Path fsWorkingFile, Path tmpLocalFile)
473        throws IOException {
474      }
475      
476      @Override
477      public void close() throws IOException {
478        super.close();
479      }
480      
481      @Override
482      public String toString() {
483        return "LocalFS";
484      }
485      
486      @Override
487      public FileStatus getFileStatus(Path f) throws IOException {
488        File path = pathToFile(f);
489        if (path.exists()) {
490          return new RawLocalFileStatus(pathToFile(f), getDefaultBlockSize(f), this);
491        } else {
492          throw new FileNotFoundException("File " + f + " does not exist");
493        }
494      }
495    
496      static class RawLocalFileStatus extends FileStatus {
497        /* We can add extra fields here. It breaks at least CopyFiles.FilePair().
498         * We recognize if the information is already loaded by check if
499         * onwer.equals("").
500         */
501        private boolean isPermissionLoaded() {
502          return !super.getOwner().equals(""); 
503        }
504        
505        RawLocalFileStatus(File f, long defaultBlockSize, FileSystem fs) {
506          super(f.length(), f.isDirectory(), 1, defaultBlockSize,
507                f.lastModified(), fs.makeQualified(new Path(f.getPath())));
508        }
509        
510        @Override
511        public FsPermission getPermission() {
512          if (!isPermissionLoaded()) {
513            loadPermissionInfo();
514          }
515          return super.getPermission();
516        }
517    
518        @Override
519        public String getOwner() {
520          if (!isPermissionLoaded()) {
521            loadPermissionInfo();
522          }
523          return super.getOwner();
524        }
525    
526        @Override
527        public String getGroup() {
528          if (!isPermissionLoaded()) {
529            loadPermissionInfo();
530          }
531          return super.getGroup();
532        }
533    
534        /// loads permissions, owner, and group from `ls -ld`
535        private void loadPermissionInfo() {
536          IOException e = null;
537          try {
538            StringTokenizer t = new StringTokenizer(
539                execCommand(new File(getPath().toUri()), 
540                            Shell.getGET_PERMISSION_COMMAND()));
541            //expected format
542            //-rw-------    1 username groupname ...
543            String permission = t.nextToken();
544            if (permission.length() > 10) { //files with ACLs might have a '+'
545              permission = permission.substring(0, 10);
546            }
547            setPermission(FsPermission.valueOf(permission));
548            t.nextToken();
549            setOwner(t.nextToken());
550            setGroup(t.nextToken());
551          } catch (Shell.ExitCodeException ioe) {
552            if (ioe.getExitCode() != 1) {
553              e = ioe;
554            } else {
555              setPermission(null);
556              setOwner(null);
557              setGroup(null);
558            }
559          } catch (IOException ioe) {
560            e = ioe;
561          } finally {
562            if (e != null) {
563              throw new RuntimeException("Error while running command to get " +
564                                         "file permissions : " + 
565                                         StringUtils.stringifyException(e));
566            }
567          }
568        }
569    
570        @Override
571        public void write(DataOutput out) throws IOException {
572          if (!isPermissionLoaded()) {
573            loadPermissionInfo();
574          }
575          super.write(out);
576        }
577      }
578    
579      /**
580       * Use the command chown to set owner.
581       */
582      @Override
583      public void setOwner(Path p, String username, String groupname)
584        throws IOException {
585        if (username == null && groupname == null) {
586          throw new IOException("username == null && groupname == null");
587        }
588    
589        if (username == null) {
590          execCommand(pathToFile(p), Shell.SET_GROUP_COMMAND, groupname); 
591        } else {
592          //OWNER[:[GROUP]]
593          String s = username + (groupname == null? "": ":" + groupname);
594          execCommand(pathToFile(p), Shell.SET_OWNER_COMMAND, s);
595        }
596      }
597    
598      /**
599       * Use the command chmod to set permission.
600       */
601      @Override
602      public void setPermission(Path p, FsPermission permission)
603        throws IOException {
604        if (NativeIO.isAvailable()) {
605          NativeIO.chmod(pathToFile(p).getCanonicalPath(),
606                         permission.toShort());
607        } else {
608          execCommand(pathToFile(p), Shell.SET_PERMISSION_COMMAND,
609              String.format("%05o", permission.toShort()));
610        }
611      }
612    
613      private static String execCommand(File f, String... cmd) throws IOException {
614        String[] args = new String[cmd.length + 1];
615        System.arraycopy(cmd, 0, args, 0, cmd.length);
616        args[cmd.length] = FileUtil.makeShellPath(f, true);
617        String output = Shell.execCommand(args);
618        return output;
619      }
620    
621    }