001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapreduce.lib.output;
020    
021    import java.io.IOException;
022    
023    import org.apache.commons.logging.Log;
024    import org.apache.commons.logging.LogFactory;
025    import org.apache.hadoop.classification.InterfaceAudience;
026    import org.apache.hadoop.classification.InterfaceAudience.Private;
027    import org.apache.hadoop.classification.InterfaceStability;
028    import org.apache.hadoop.fs.FileStatus;
029    import org.apache.hadoop.fs.FileSystem;
030    import org.apache.hadoop.fs.Path;
031    import org.apache.hadoop.fs.PathFilter;
032    import org.apache.hadoop.mapreduce.JobContext;
033    import org.apache.hadoop.mapreduce.JobStatus;
034    import org.apache.hadoop.mapreduce.MRJobConfig;
035    import org.apache.hadoop.mapreduce.OutputCommitter;
036    import org.apache.hadoop.mapreduce.TaskAttemptContext;
037    import org.apache.hadoop.mapreduce.TaskAttemptID;
038    
039    /** An {@link OutputCommitter} that commits files specified 
040     * in job output directory i.e. ${mapreduce.output.fileoutputformat.outputdir}.
041     **/
042    @InterfaceAudience.Public
043    @InterfaceStability.Stable
044    public class FileOutputCommitter extends OutputCommitter {
045      private static final Log LOG = LogFactory.getLog(FileOutputCommitter.class);
046    
047      /** 
048       * Name of directory where pending data is placed.  Data that has not been
049       * committed yet.
050       */
051      public static final String PENDING_DIR_NAME = "_temporary";
052      public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
053      public static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER = 
054        "mapreduce.fileoutputcommitter.marksuccessfuljobs";
055      private Path outputPath = null;
056      private Path workPath = null;
057    
058      /**
059       * Create a file output committer
060       * @param outputPath the job's output path, or null if you want the output
061       * committer to act as a noop.
062       * @param context the task's context
063       * @throws IOException
064       */
065      public FileOutputCommitter(Path outputPath, 
066                                 TaskAttemptContext context) throws IOException {
067        this(outputPath, (JobContext)context);
068        if (outputPath != null) {
069          workPath = getTaskAttemptPath(context, outputPath);
070        }
071      }
072      
073      /**
074       * Create a file output committer
075       * @param outputPath the job's output path, or null if you want the output
076       * committer to act as a noop.
077       * @param context the task's context
078       * @throws IOException
079       */
080      @Private
081      public FileOutputCommitter(Path outputPath, 
082                                 JobContext context) throws IOException {
083        if (outputPath != null) {
084          FileSystem fs = outputPath.getFileSystem(context.getConfiguration());
085          this.outputPath = fs.makeQualified(outputPath);
086        }
087      }
088      
089      /**
090       * @return the path where final output of the job should be placed.  This
091       * could also be considered the committed application attempt path.
092       */
093      private Path getOutputPath() {
094        return this.outputPath;
095      }
096      
097      /**
098       * @return true if we have an output path set, else false.
099       */
100      private boolean hasOutputPath() {
101        return this.outputPath != null;
102      }
103      
104      /**
105       * @return the path where the output of pending job attempts are
106       * stored.
107       */
108      private Path getPendingJobAttemptsPath() {
109        return getPendingJobAttemptsPath(getOutputPath());
110      }
111      
112      /**
113       * Get the location of pending job attempts.
114       * @param out the base output directory.
115       * @return the location of pending job attempts.
116       */
117      private static Path getPendingJobAttemptsPath(Path out) {
118        return new Path(out, PENDING_DIR_NAME);
119      }
120      
121      /**
122       * Get the Application Attempt Id for this job
123       * @param context the context to look in
124       * @return the Application Attempt Id for a given job.
125       */
126      private static int getAppAttemptId(JobContext context) {
127        return context.getConfiguration().getInt(
128            MRJobConfig.APPLICATION_ATTEMPT_ID, 0);
129      }
130      
131      /**
132       * Compute the path where the output of a given job attempt will be placed. 
133       * @param context the context of the job.  This is used to get the
134       * application attempt id.
135       * @return the path to store job attempt data.
136       */
137      public Path getJobAttemptPath(JobContext context) {
138        return getJobAttemptPath(context, getOutputPath());
139      }
140      
141      /**
142       * Compute the path where the output of a given job attempt will be placed. 
143       * @param context the context of the job.  This is used to get the
144       * application attempt id.
145       * @param out the output path to place these in.
146       * @return the path to store job attempt data.
147       */
148      public static Path getJobAttemptPath(JobContext context, Path out) {
149        return getJobAttemptPath(getAppAttemptId(context), out);
150      }
151      
152      /**
153       * Compute the path where the output of a given job attempt will be placed. 
154       * @param appAttemptId the ID of the application attempt for this job.
155       * @return the path to store job attempt data.
156       */
157      private Path getJobAttemptPath(int appAttemptId) {
158        return getJobAttemptPath(appAttemptId, getOutputPath());
159      }
160      
161      /**
162       * Compute the path where the output of a given job attempt will be placed. 
163       * @param appAttemptId the ID of the application attempt for this job.
164       * @return the path to store job attempt data.
165       */
166      private static Path getJobAttemptPath(int appAttemptId, Path out) {
167        return new Path(getPendingJobAttemptsPath(out), String.valueOf(appAttemptId));
168      }
169      
170      /**
171       * Compute the path where the output of pending task attempts are stored.
172       * @param context the context of the job with pending tasks. 
173       * @return the path where the output of pending task attempts are stored.
174       */
175      private Path getPendingTaskAttemptsPath(JobContext context) {
176        return getPendingTaskAttemptsPath(context, getOutputPath());
177      }
178      
179      /**
180       * Compute the path where the output of pending task attempts are stored.
181       * @param context the context of the job with pending tasks. 
182       * @return the path where the output of pending task attempts are stored.
183       */
184      private static Path getPendingTaskAttemptsPath(JobContext context, Path out) {
185        return new Path(getJobAttemptPath(context, out), PENDING_DIR_NAME);
186      }
187      
188      /**
189       * Compute the path where the output of a task attempt is stored until
190       * that task is committed.
191       * 
192       * @param context the context of the task attempt.
193       * @return the path where a task attempt should be stored.
194       */
195      public Path getTaskAttemptPath(TaskAttemptContext context) {
196        return new Path(getPendingTaskAttemptsPath(context), 
197            String.valueOf(context.getTaskAttemptID()));
198      }
199      
200      /**
201       * Compute the path where the output of a task attempt is stored until
202       * that task is committed.
203       * 
204       * @param context the context of the task attempt.
205       * @param out The output path to put things in.
206       * @return the path where a task attempt should be stored.
207       */
208      public static Path getTaskAttemptPath(TaskAttemptContext context, Path out) {
209        return new Path(getPendingTaskAttemptsPath(context, out), 
210            String.valueOf(context.getTaskAttemptID()));
211      }
212      
213      /**
214       * Compute the path where the output of a committed task is stored until
215       * the entire job is committed.
216       * @param context the context of the task attempt
217       * @return the path where the output of a committed task is stored until
218       * the entire job is committed.
219       */
220      public Path getCommittedTaskPath(TaskAttemptContext context) {
221        return getCommittedTaskPath(getAppAttemptId(context), context);
222      }
223      
224      public static Path getCommittedTaskPath(TaskAttemptContext context, Path out) {
225        return getCommittedTaskPath(getAppAttemptId(context), context, out);
226      }
227      
228      /**
229       * Compute the path where the output of a committed task is stored until the
230       * entire job is committed for a specific application attempt.
231       * @param appAttemptId the id of the application attempt to use
232       * @param context the context of any task.
233       * @return the path where the output of a committed task is stored.
234       */
235      private Path getCommittedTaskPath(int appAttemptId, TaskAttemptContext context) {
236        return new Path(getJobAttemptPath(appAttemptId),
237            String.valueOf(context.getTaskAttemptID().getTaskID()));
238      }
239      
240      private static Path getCommittedTaskPath(int appAttemptId, TaskAttemptContext context, Path out) {
241        return new Path(getJobAttemptPath(appAttemptId, out),
242            String.valueOf(context.getTaskAttemptID().getTaskID()));
243      }
244      
245      private static class CommittedTaskFilter implements PathFilter {
246        @Override
247        public boolean accept(Path path) {
248          return !PENDING_DIR_NAME.equals(path.getName());
249        }
250      }
251      
252      /**
253       * Get a list of all paths where output from committed tasks are stored.
254       * @param context the context of the current job
255       * @return the list of these Paths/FileStatuses. 
256       * @throws IOException
257       */
258      private FileStatus[] getAllCommittedTaskPaths(JobContext context) 
259        throws IOException {
260        Path jobAttemptPath = getJobAttemptPath(context);
261        FileSystem fs = jobAttemptPath.getFileSystem(context.getConfiguration());
262        return fs.listStatus(jobAttemptPath, new CommittedTaskFilter());
263      }
264      
265      /**
266       * Get the directory that the task should write results into.
267       * @return the work directory
268       * @throws IOException
269       */
270      public Path getWorkPath() throws IOException {
271        return workPath;
272      }
273    
274      /**
275       * Create the temporary directory that is the root of all of the task 
276       * work directories.
277       * @param context the job's context
278       */
279      public void setupJob(JobContext context) throws IOException {
280        if (hasOutputPath()) {
281          Path jobAttemptPath = getJobAttemptPath(context);
282          FileSystem fs = jobAttemptPath.getFileSystem(
283              context.getConfiguration());
284          if (!fs.mkdirs(jobAttemptPath)) {
285            LOG.error("Mkdirs failed to create " + jobAttemptPath);
286          }
287        } else {
288          LOG.warn("Output Path is null in setupJob()");
289        }
290      }
291      
292      /**
293       * The job has completed so move all committed tasks to the final output dir.
294       * Delete the temporary directory, including all of the work directories.
295       * Create a _SUCCESS file to make it as successful.
296       * @param context the job's context
297       */
298      public void commitJob(JobContext context) throws IOException {
299        if (hasOutputPath()) {
300          Path finalOutput = getOutputPath();
301          FileSystem fs = finalOutput.getFileSystem(context.getConfiguration());
302          for(FileStatus stat: getAllCommittedTaskPaths(context)) {
303            mergePaths(fs, stat, finalOutput);
304          }
305    
306          // delete the _temporary folder and create a _done file in the o/p folder
307          cleanupJob(context);
308          // True if the job requires output.dir marked on successful job.
309          // Note that by default it is set to true.
310          if (context.getConfiguration().getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true)) {
311            Path markerPath = new Path(outputPath, SUCCEEDED_FILE_NAME);
312            fs.create(markerPath).close();
313          }
314        } else {
315          LOG.warn("Output Path is null in commitJob()");
316        }
317      }
318    
319      /**
320       * Merge two paths together.  Anything in from will be moved into to, if there
321       * are any name conflicts while merging the files or directories in from win.
322       * @param fs the File System to use
323       * @param from the path data is coming from.
324       * @param to the path data is going to.
325       * @throws IOException on any error
326       */
327      private static void mergePaths(FileSystem fs, final FileStatus from,
328          final Path to)
329        throws IOException {
330         LOG.debug("Merging data from "+from+" to "+to);
331         if(from.isFile()) {
332           if(fs.exists(to)) {
333             if(!fs.delete(to, true)) {
334               throw new IOException("Failed to delete "+to);
335             }
336           }
337    
338           if(!fs.rename(from.getPath(), to)) {
339             throw new IOException("Failed to rename "+from+" to "+to);
340           }
341         } else if(from.isDirectory()) {
342           if(fs.exists(to)) {
343             FileStatus toStat = fs.getFileStatus(to);
344             if(!toStat.isDirectory()) {
345               if(!fs.delete(to, true)) {
346                 throw new IOException("Failed to delete "+to);
347               }
348               if(!fs.rename(from.getPath(), to)) {
349                 throw new IOException("Failed to rename "+from+" to "+to);
350               }
351             } else {
352               //It is a directory so merge everything in the directories
353               for(FileStatus subFrom: fs.listStatus(from.getPath())) {
354                 Path subTo = new Path(to, subFrom.getPath().getName());
355                 mergePaths(fs, subFrom, subTo);
356               }
357             }
358           } else {
359             //it does not exist just rename
360             if(!fs.rename(from.getPath(), to)) {
361               throw new IOException("Failed to rename "+from+" to "+to);
362             }
363           }
364         }
365      }
366    
367      @Override
368      @Deprecated
369      public void cleanupJob(JobContext context) throws IOException {
370        if (hasOutputPath()) {
371          Path pendingJobAttemptsPath = getPendingJobAttemptsPath();
372          FileSystem fs = pendingJobAttemptsPath
373              .getFileSystem(context.getConfiguration());
374          fs.delete(pendingJobAttemptsPath, true);
375        } else {
376          LOG.warn("Output Path is null in cleanupJob()");
377        }
378      }
379    
380      /**
381       * Delete the temporary directory, including all of the work directories.
382       * @param context the job's context
383       */
384      @Override
385      public void abortJob(JobContext context, JobStatus.State state) 
386      throws IOException {
387        // delete the _temporary folder
388        cleanupJob(context);
389      }
390      
391      /**
392       * No task setup required.
393       */
394      @Override
395      public void setupTask(TaskAttemptContext context) throws IOException {
396        // FileOutputCommitter's setupTask doesn't do anything. Because the
397        // temporary task directory is created on demand when the 
398        // task is writing.
399      }
400    
401      /**
402       * Move the files from the work directory to the job output directory
403       * @param context the task context
404       */
405      @Override
406      public void commitTask(TaskAttemptContext context) 
407      throws IOException {
408        commitTask(context, null);
409      }
410    
411      @Private
412      public void commitTask(TaskAttemptContext context, Path taskAttemptPath) 
413      throws IOException {
414        TaskAttemptID attemptId = context.getTaskAttemptID();
415        if (hasOutputPath()) {
416          context.progress();
417          if(taskAttemptPath == null) {
418            taskAttemptPath = getTaskAttemptPath(context);
419          }
420          Path committedTaskPath = getCommittedTaskPath(context);
421          FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration());
422          if (fs.exists(taskAttemptPath)) {
423            if(fs.exists(committedTaskPath)) {
424              if(!fs.delete(committedTaskPath, true)) {
425                throw new IOException("Could not delete " + committedTaskPath);
426              }
427            }
428            if(!fs.rename(taskAttemptPath, committedTaskPath)) {
429              throw new IOException("Could not rename " + taskAttemptPath + " to "
430                  + committedTaskPath);
431            }
432            LOG.info("Saved output of task '" + attemptId + "' to " + 
433                committedTaskPath);
434          } else {
435            LOG.warn("No Output found for " + attemptId);
436          }
437        } else {
438          LOG.warn("Output Path is null in commitTask()");
439        }
440      }
441    
442      /**
443       * Delete the work directory
444       * @throws IOException 
445       */
446      @Override
447      public void abortTask(TaskAttemptContext context) throws IOException {
448        abortTask(context, null);
449      }
450    
451      @Private
452      public void abortTask(TaskAttemptContext context, Path taskAttemptPath) throws IOException {
453        if (hasOutputPath()) { 
454          context.progress();
455          if(taskAttemptPath == null) {
456            taskAttemptPath = getTaskAttemptPath(context);
457          }
458          FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration());
459          if(!fs.delete(taskAttemptPath, true)) {
460            LOG.warn("Could not delete "+taskAttemptPath);
461          }
462        } else {
463          LOG.warn("Output Path is null in abortTask()");
464        }
465      }
466    
467      /**
468       * Did this task write any files in the work directory?
469       * @param context the task's context
470       */
471      @Override
472      public boolean needsTaskCommit(TaskAttemptContext context
473                                     ) throws IOException {
474        return needsTaskCommit(context, null);
475      }
476    
477      @Private
478      public boolean needsTaskCommit(TaskAttemptContext context, Path taskAttemptPath
479        ) throws IOException {
480        if(hasOutputPath()) {
481          if(taskAttemptPath == null) {
482            taskAttemptPath = getTaskAttemptPath(context);
483          }
484          FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration());
485          return fs.exists(taskAttemptPath);
486        }
487        return false;
488      }
489    
490      @Override
491      public boolean isRecoverySupported() {
492        return true;
493      }
494      
495      @Override
496      public void recoverTask(TaskAttemptContext context)
497          throws IOException {
498        if(hasOutputPath()) {
499          context.progress();
500          TaskAttemptID attemptId = context.getTaskAttemptID();
501          int previousAttempt = getAppAttemptId(context) - 1;
502          if (previousAttempt < 0) {
503            throw new IOException ("Cannot recover task output for first attempt...");
504          }
505    
506          Path committedTaskPath = getCommittedTaskPath(context);
507          Path previousCommittedTaskPath = getCommittedTaskPath(
508              previousAttempt, context);
509          FileSystem fs = committedTaskPath.getFileSystem(context.getConfiguration());
510    
511          LOG.debug("Trying to recover task from " + previousCommittedTaskPath 
512              + " into " + committedTaskPath);
513          if (fs.exists(previousCommittedTaskPath)) {
514            if(fs.exists(committedTaskPath)) {
515              if(!fs.delete(committedTaskPath, true)) {
516                throw new IOException("Could not delete "+committedTaskPath);
517              }
518            }
519            //Rename can fail if the parent directory does not yet exist.
520            Path committedParent = committedTaskPath.getParent();
521            fs.mkdirs(committedParent);
522            if(!fs.rename(previousCommittedTaskPath, committedTaskPath)) {
523              throw new IOException("Could not rename " + previousCommittedTaskPath +
524                  " to " + committedTaskPath);
525            }
526            LOG.info("Saved output of " + attemptId + " to " + committedTaskPath);
527          } else {
528            LOG.warn(attemptId+" had no output to recover.");
529          }
530        } else {
531          LOG.warn("Output Path is null in recoverTask()");
532        }
533      }
534    }