001    /**
002     *
003     * Licensed under the Apache License, Version 2.0
004     * (the "License"); you may not use this file except in compliance with
005     * the License. You may obtain a copy of the License at
006     *
007     * http://www.apache.org/licenses/LICENSE-2.0
008     *
009     * Unless required by applicable law or agreed to in writing, software
010     * distributed under the License is distributed on an "AS IS" BASIS,
011     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
012     * implied. See the License for the specific language governing
013     * permissions and limitations under the License.
014     *
015     * 
016     * Implements the Hadoop FS interfaces to allow applications to store
017     *files in Kosmos File System (KFS).
018     */
019    
020    package org.apache.hadoop.fs.kfs;
021    
022    import java.io.FileNotFoundException;
023    import java.io.IOException;
024    import java.net.URI;
025    
026    import org.apache.hadoop.classification.InterfaceAudience;
027    import org.apache.hadoop.classification.InterfaceStability;
028    import org.apache.hadoop.conf.Configuration;
029    import org.apache.hadoop.fs.BlockLocation;
030    import org.apache.hadoop.fs.FSDataInputStream;
031    import org.apache.hadoop.fs.FSDataOutputStream;
032    import org.apache.hadoop.fs.FileStatus;
033    import org.apache.hadoop.fs.FileSystem;
034    import org.apache.hadoop.fs.FileUtil;
035    import org.apache.hadoop.fs.Path;
036    import org.apache.hadoop.fs.permission.FsPermission;
037    import org.apache.hadoop.util.Progressable;
038    
039    /**
040     * A FileSystem backed by KFS.
041     *
042     */
043    @InterfaceAudience.Public
044    @InterfaceStability.Stable
045    public class KosmosFileSystem extends FileSystem {
046    
047        private FileSystem localFs;
048        private IFSImpl kfsImpl = null;
049        private URI uri;
050        private Path workingDir = new Path("/");
051    
052        public KosmosFileSystem() {
053    
054        }
055    
056        KosmosFileSystem(IFSImpl fsimpl) {
057            this.kfsImpl = fsimpl;
058        }
059    
060        /**
061         * Return the protocol scheme for the FileSystem.
062         * <p/>
063         *
064         * @return <code>kfs</code>
065         */
066        @Override
067        public String getScheme() {
068          return "kfs";
069        }
070    
071        @Override
072        public URI getUri() {
073            return uri;
074        }
075    
076        @Override
077        public void initialize(URI uri, Configuration conf) throws IOException {
078          super.initialize(uri, conf);
079          try {
080            if (kfsImpl == null) {
081              if (uri.getHost() == null) {
082                kfsImpl = new KFSImpl(conf.get("fs.kfs.metaServerHost", ""),
083                                      conf.getInt("fs.kfs.metaServerPort", -1),
084                                      statistics);
085              } else {
086                kfsImpl = new KFSImpl(uri.getHost(), uri.getPort(), statistics);
087              }
088            }
089    
090            this.localFs = FileSystem.getLocal(conf);
091            this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
092            this.workingDir = new Path("/user", System.getProperty("user.name")
093                                       ).makeQualified(this);
094            setConf(conf);
095    
096          } catch (Exception e) {
097            e.printStackTrace();
098            System.out.println("Unable to initialize KFS");
099            System.exit(-1);
100          }
101        }
102    
103        @Override
104        public Path getWorkingDirectory() {
105            return workingDir;
106        }
107    
108        @Override
109        public void setWorkingDirectory(Path dir) {
110            workingDir = makeAbsolute(dir);
111        }
112    
113        private Path makeAbsolute(Path path) {
114            if (path.isAbsolute()) {
115                return path;
116            }
117            return new Path(workingDir, path);
118        }
119    
120        @Override
121        public boolean mkdirs(Path path, FsPermission permission
122            ) throws IOException {
123            Path absolute = makeAbsolute(path);
124            String srep = absolute.toUri().getPath();
125    
126            int res;
127    
128            // System.out.println("Calling mkdirs on: " + srep);
129    
130            res = kfsImpl.mkdirs(srep);
131            
132            return res == 0;
133        }
134    
135        @Override
136        public boolean isDirectory(Path path) throws IOException {
137            Path absolute = makeAbsolute(path);
138            String srep = absolute.toUri().getPath();
139    
140            // System.out.println("Calling isdir on: " + srep);
141    
142            return kfsImpl.isDirectory(srep);
143        }
144    
145        @Override
146        public boolean isFile(Path path) throws IOException {
147            Path absolute = makeAbsolute(path);
148            String srep = absolute.toUri().getPath();
149            return kfsImpl.isFile(srep);
150        }
151    
152        @Override
153        public FileStatus[] listStatus(Path path) throws IOException {
154            Path absolute = makeAbsolute(path);
155            String srep = absolute.toUri().getPath();
156    
157            if(!kfsImpl.exists(srep))
158              throw new FileNotFoundException("File " + path + " does not exist.");
159    
160            if (kfsImpl.isFile(srep))
161                    return new FileStatus[] { getFileStatus(path) } ;
162    
163            return kfsImpl.readdirplus(absolute);
164        }
165    
166        @Override
167        public FileStatus getFileStatus(Path path) throws IOException {
168            Path absolute = makeAbsolute(path);
169            String srep = absolute.toUri().getPath();
170            if (!kfsImpl.exists(srep)) {
171              throw new FileNotFoundException("File " + path + " does not exist.");
172            }
173            if (kfsImpl.isDirectory(srep)) {
174                // System.out.println("Status of path: " + path + " is dir");
175                return new FileStatus(0, true, 1, 0, kfsImpl.getModificationTime(srep), 
176                                      path.makeQualified(this));
177            } else {
178                // System.out.println("Status of path: " + path + " is file");
179                return new FileStatus(kfsImpl.filesize(srep), false, 
180                                      kfsImpl.getReplication(srep),
181                                      getDefaultBlockSize(),
182                                      kfsImpl.getModificationTime(srep),
183                                      path.makeQualified(this));
184            }
185        }
186        
187        @Override
188        public FSDataOutputStream append(Path f, int bufferSize,
189            Progressable progress) throws IOException {
190            Path parent = f.getParent();
191            if (parent != null && !mkdirs(parent)) {
192                throw new IOException("Mkdirs failed to create " + parent);
193            }
194    
195            Path absolute = makeAbsolute(f);
196            String srep = absolute.toUri().getPath();
197    
198            return kfsImpl.append(srep, bufferSize, progress);
199        }
200    
201        @Override
202        public FSDataOutputStream create(Path file, FsPermission permission,
203                                         boolean overwrite, int bufferSize,
204                                         short replication, long blockSize, Progressable progress)
205            throws IOException {
206    
207            if (exists(file)) {
208                if (overwrite) {
209                    delete(file, true);
210                } else {
211                    throw new IOException("File already exists: " + file);
212                }
213            }
214    
215            Path parent = file.getParent();
216            if (parent != null && !mkdirs(parent)) {
217                throw new IOException("Mkdirs failed to create " + parent);
218            }
219    
220            Path absolute = makeAbsolute(file);
221            String srep = absolute.toUri().getPath();
222    
223            return kfsImpl.create(srep, replication, bufferSize, progress);
224        }
225    
226        @Override
227        public FSDataInputStream open(Path path, int bufferSize) throws IOException {
228            if (!exists(path))
229                throw new IOException("File does not exist: " + path);
230    
231            Path absolute = makeAbsolute(path);
232            String srep = absolute.toUri().getPath();
233    
234            return kfsImpl.open(srep, bufferSize);
235        }
236    
237        @Override
238        public boolean rename(Path src, Path dst) throws IOException {
239            Path absoluteS = makeAbsolute(src);
240            String srepS = absoluteS.toUri().getPath();
241            Path absoluteD = makeAbsolute(dst);
242            String srepD = absoluteD.toUri().getPath();
243    
244            // System.out.println("Calling rename on: " + srepS + " -> " + srepD);
245    
246            return kfsImpl.rename(srepS, srepD) == 0;
247        }
248    
249        // recursively delete the directory and its contents
250        @Override
251        public boolean delete(Path path, boolean recursive) throws IOException {
252          Path absolute = makeAbsolute(path);
253          String srep = absolute.toUri().getPath();
254          if (kfsImpl.isFile(srep))
255            return kfsImpl.remove(srep) == 0;
256    
257          FileStatus[] dirEntries = listStatus(absolute);
258          if (!recursive && (dirEntries.length != 0)) {
259            throw new IOException("Directory " + path.toString() + 
260            " is not empty.");
261          }
262    
263          for (int i = 0; i < dirEntries.length; i++) {
264            delete(new Path(absolute, dirEntries[i].getPath()), recursive);
265          }
266          return kfsImpl.rmdir(srep) == 0;
267        }
268        
269        @Override
270        public short getDefaultReplication() {
271            return 3;
272        }
273    
274        @Override
275        public boolean setReplication(Path path, short replication)
276            throws IOException {
277    
278            Path absolute = makeAbsolute(path);
279            String srep = absolute.toUri().getPath();
280    
281            int res = kfsImpl.setReplication(srep, replication);
282            return res >= 0;
283        }
284    
285        // 64MB is the KFS block size
286    
287        @Override
288        public long getDefaultBlockSize() {
289            return 1 << 26;
290        }
291    
292        @Deprecated            
293        public void lock(Path path, boolean shared) throws IOException {
294    
295        }
296    
297        @Deprecated            
298        public void release(Path path) throws IOException {
299    
300        }
301    
302        /**
303         * Return null if the file doesn't exist; otherwise, get the
304         * locations of the various chunks of the file file from KFS.
305         */
306        @Override
307        public BlockLocation[] getFileBlockLocations(FileStatus file, long start,
308            long len) throws IOException {
309    
310          if (file == null) {
311            return null;
312          }
313          String srep = makeAbsolute(file.getPath()).toUri().getPath();
314          String[][] hints = kfsImpl.getDataLocation(srep, start, len);
315          if (hints == null) {
316            return null;
317          }
318          BlockLocation[] result = new BlockLocation[hints.length];
319          long blockSize = getDefaultBlockSize();
320          long length = len;
321          long blockStart = start;
322          for(int i=0; i < result.length; ++i) {
323            result[i] = new BlockLocation(null, hints[i], blockStart, 
324                                          length < blockSize ? length : blockSize);
325            blockStart += blockSize;
326            length -= blockSize;
327          }
328          return result;
329        }
330    
331        @Override
332        public void copyFromLocalFile(boolean delSrc, Path src, Path dst) throws IOException {
333            FileUtil.copy(localFs, src, this, dst, delSrc, getConf());
334        }
335    
336        @Override
337        public void copyToLocalFile(boolean delSrc, Path src, Path dst) throws IOException {
338            FileUtil.copy(this, src, localFs, dst, delSrc, getConf());
339        }
340    
341        @Override
342        public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
343            throws IOException {
344            return tmpLocalFile;
345        }
346    
347        @Override
348        public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
349            throws IOException {
350            moveFromLocalFile(tmpLocalFile, fsOutputFile);
351        }
352    }