001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapreduce;
020    
021    import java.io.IOException;
022    import java.net.URI;
023    import java.security.PrivilegedExceptionAction;
024    
025    import org.apache.commons.logging.Log;
026    import org.apache.commons.logging.LogFactory;
027    import org.apache.hadoop.classification.InterfaceAudience;
028    import org.apache.hadoop.classification.InterfaceStability;
029    import org.apache.hadoop.classification.InterfaceAudience.Private;
030    import org.apache.hadoop.conf.Configuration;
031    import org.apache.hadoop.conf.Configuration.IntegerRanges;
032    import org.apache.hadoop.fs.FileSystem;
033    import org.apache.hadoop.fs.Path;
034    import org.apache.hadoop.io.RawComparator;
035    import org.apache.hadoop.mapred.JobConf;
036    import org.apache.hadoop.mapreduce.filecache.DistributedCache;
037    import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
038    import org.apache.hadoop.mapreduce.task.JobContextImpl;
039    import org.apache.hadoop.mapreduce.util.ConfigUtil;
040    import org.apache.hadoop.util.StringUtils;
041    
042    /**
043     * The job submitter's view of the Job.
044     * 
045     * <p>It allows the user to configure the
046     * job, submit it, control its execution, and query the state. The set methods
047     * only work until the job is submitted, afterwards they will throw an 
048     * IllegalStateException. </p>
049     * 
050     * <p>
051     * Normally the user creates the application, describes various facets of the
052     * job via {@link Job} and then submits the job and monitor its progress.</p>
053     * 
054     * <p>Here is an example on how to submit a job:</p>
055     * <p><blockquote><pre>
056     *     // Create a new Job
057     *     Job job = new Job(new Configuration());
058     *     job.setJarByClass(MyJob.class);
059     *     
060     *     // Specify various job-specific parameters     
061     *     job.setJobName("myjob");
062     *     
063     *     job.setInputPath(new Path("in"));
064     *     job.setOutputPath(new Path("out"));
065     *     
066     *     job.setMapperClass(MyJob.MyMapper.class);
067     *     job.setReducerClass(MyJob.MyReducer.class);
068     *
069     *     // Submit the job, then poll for progress until the job is complete
070     *     job.waitForCompletion(true);
071     * </pre></blockquote></p>
072     * 
073     * 
074     */
075    @InterfaceAudience.Public
076    @InterfaceStability.Evolving
077    public class Job extends JobContextImpl implements JobContext {  
078      private static final Log LOG = LogFactory.getLog(Job.class);
079    
080      @InterfaceStability.Evolving
081      public static enum JobState {DEFINE, RUNNING};
082      private static final long MAX_JOBSTATUS_AGE = 1000 * 2;
083      public static final String OUTPUT_FILTER = "mapreduce.client.output.filter";
084      /** Key in mapred-*.xml that sets completionPollInvervalMillis */
085      public static final String COMPLETION_POLL_INTERVAL_KEY = 
086        "mapreduce.client.completion.pollinterval";
087      
088      /** Default completionPollIntervalMillis is 5000 ms. */
089      static final int DEFAULT_COMPLETION_POLL_INTERVAL = 5000;
090      /** Key in mapred-*.xml that sets progMonitorPollIntervalMillis */
091      public static final String PROGRESS_MONITOR_POLL_INTERVAL_KEY =
092        "mapreduce.client.progressmonitor.pollinterval";
093      /** Default progMonitorPollIntervalMillis is 1000 ms. */
094      static final int DEFAULT_MONITOR_POLL_INTERVAL = 1000;
095    
096      public static final String USED_GENERIC_PARSER = 
097        "mapreduce.client.genericoptionsparser.used";
098      public static final String SUBMIT_REPLICATION = 
099        "mapreduce.client.submit.file.replication";
100      private static final String TASKLOG_PULL_TIMEOUT_KEY =
101               "mapreduce.client.tasklog.timeout";
102      private static final int DEFAULT_TASKLOG_TIMEOUT = 60000;
103    
104      @InterfaceStability.Evolving
105      public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }
106    
107      static {
108        ConfigUtil.loadResources();
109      }
110    
111      private JobState state = JobState.DEFINE;
112      private JobStatus status;
113      private long statustime;
114      private Cluster cluster;
115    
116      @Deprecated
117      public Job() throws IOException {
118        this(new Configuration());
119      }
120    
121      @Deprecated
122      public Job(Configuration conf) throws IOException {
123        this(new JobConf(conf));
124      }
125    
126      @Deprecated
127      public Job(Configuration conf, String jobName) throws IOException {
128        this(conf);
129        setJobName(jobName);
130      }
131    
132      Job(JobConf conf) throws IOException {
133        super(conf, null);
134        // propagate existing user credentials to job
135        this.credentials.mergeAll(this.ugi.getCredentials());
136        this.cluster = null;
137      }
138    
139      Job(JobStatus status, JobConf conf) throws IOException {
140        this(conf);
141        setJobID(status.getJobID());
142        this.status = status;
143        state = JobState.RUNNING;
144      }
145    
146          
147      /**
148       * Creates a new {@link Job} with no particular {@link Cluster} .
149       * A Cluster will be created with a generic {@link Configuration}.
150       * 
151       * @return the {@link Job} , with no connection to a cluster yet.
152       * @throws IOException
153       */
154      public static Job getInstance() throws IOException {
155        // create with a null Cluster
156        return getInstance(new Configuration());
157      }
158          
159      /**
160       * Creates a new {@link Job} with no particular {@link Cluster} and a 
161       * given {@link Configuration}.
162       * 
163       * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
164       * that any necessary internal modifications do not reflect on the incoming 
165       * parameter.
166       * 
167       * A Cluster will be created from the conf parameter only when it's needed.
168       * 
169       * @param conf the configuration
170       * @return the {@link Job} , with no connection to a cluster yet.
171       * @throws IOException
172       */
173      public static Job getInstance(Configuration conf) throws IOException {
174        // create with a null Cluster
175        JobConf jobConf = new JobConf(conf);
176        return new Job(jobConf);
177      }
178    
179          
180      /**
181       * Creates a new {@link Job} with no particular {@link Cluster} and a given jobName.
182       * A Cluster will be created from the conf parameter only when it's needed.
183       *
184       * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
185       * that any necessary internal modifications do not reflect on the incoming 
186       * parameter.
187       * 
188       * @param conf the configuration
189       * @return the {@link Job} , with no connection to a cluster yet.
190       * @throws IOException
191       */
192      public static Job getInstance(Configuration conf, String jobName)
193               throws IOException {
194        // create with a null Cluster
195        Job result = getInstance(conf);
196        result.setJobName(jobName);
197        return result;
198      }
199      
200      /**
201       * Creates a new {@link Job} with no particular {@link Cluster} and given
202       * {@link Configuration} and {@link JobStatus}.
203       * A Cluster will be created from the conf parameter only when it's needed.
204       * 
205       * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
206       * that any necessary internal modifications do not reflect on the incoming 
207       * parameter.
208       * 
209       * @param status job status
210       * @param conf job configuration
211       * @return the {@link Job} , with no connection to a cluster yet.
212       * @throws IOException
213       */
214      public static Job getInstance(JobStatus status, Configuration conf) 
215      throws IOException {
216        return new Job(status, new JobConf(conf));
217      }
218    
219      /**
220       * Creates a new {@link Job} with no particular {@link Cluster}.
221       * A Cluster will be created from the conf parameter only when it's needed.
222       *
223       * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
224       * that any necessary internal modifications do not reflect on the incoming 
225       * parameter.
226       * 
227       * @param ignored
228       * @return the {@link Job} , with no connection to a cluster yet.
229       * @throws IOException
230       * @deprecated Use {@link #getInstance()}
231       */
232      @Deprecated
233      public static Job getInstance(Cluster ignored) throws IOException {
234        return getInstance();
235      }
236      
237      /**
238       * Creates a new {@link Job} with no particular {@link Cluster} and given
239       * {@link Configuration}.
240       * A Cluster will be created from the conf parameter only when it's needed.
241       * 
242       * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
243       * that any necessary internal modifications do not reflect on the incoming 
244       * parameter.
245       * 
246       * @param ignored
247       * @param conf job configuration
248       * @return the {@link Job} , with no connection to a cluster yet.
249       * @throws IOException
250       * @deprecated Use {@link #getInstance(Configuration)}
251       */
252      @Deprecated
253      public static Job getInstance(Cluster ignored, Configuration conf) 
254          throws IOException {
255        return getInstance(conf);
256      }
257      
258      /**
259       * Creates a new {@link Job} with no particular {@link Cluster} and given
260       * {@link Configuration} and {@link JobStatus}.
261       * A Cluster will be created from the conf parameter only when it's needed.
262       * 
263       * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
264       * that any necessary internal modifications do not reflect on the incoming 
265       * parameter.
266       * 
267       * @param cluster cluster
268       * @param status job status
269       * @param conf job configuration
270       * @return the {@link Job} , with no connection to a cluster yet.
271       * @throws IOException
272       */
273      @Private
274      public static Job getInstance(Cluster cluster, JobStatus status, 
275          Configuration conf) throws IOException {
276        Job job = getInstance(status, conf);
277        job.setCluster(cluster);
278        return job;
279      }
280    
281      private void ensureState(JobState state) throws IllegalStateException {
282        if (state != this.state) {
283          throw new IllegalStateException("Job in state "+ this.state + 
284                                          " instead of " + state);
285        }
286    
287        if (state == JobState.RUNNING && cluster == null) {
288          throw new IllegalStateException
289            ("Job in state " + this.state
290             + ", but it isn't attached to any job tracker!");
291        }
292      }
293    
294      /**
295       * Some methods rely on having a recent job status object.  Refresh
296       * it, if necessary
297       */
298      synchronized void ensureFreshStatus() 
299          throws IOException, InterruptedException {
300        if (System.currentTimeMillis() - statustime > MAX_JOBSTATUS_AGE) {
301          updateStatus();
302        }
303      }
304        
305      /** Some methods need to update status immediately. So, refresh
306       * immediately
307       * @throws IOException
308       */
309      synchronized void updateStatus() throws IOException, InterruptedException {
310        this.status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
311          @Override
312          public JobStatus run() throws IOException, InterruptedException {
313            return cluster.getClient().getJobStatus(status.getJobID());
314          }
315        });
316        if (this.status == null) {
317          throw new IOException("Job status not available ");
318        }
319        this.statustime = System.currentTimeMillis();
320      }
321      
322      public JobStatus getStatus() throws IOException, InterruptedException {
323        ensureState(JobState.RUNNING);
324        updateStatus();
325        return status;
326      }
327      
328      private void setStatus(JobStatus status) {
329        this.status = status;
330      }
331    
332      /**
333       * Returns the current state of the Job.
334       * 
335       * @return JobStatus#State
336       * @throws IOException
337       * @throws InterruptedException
338       */
339      public JobStatus.State getJobState() 
340          throws IOException, InterruptedException {
341        ensureState(JobState.RUNNING);
342        updateStatus();
343        return status.getState();
344      }
345      
346      /**
347       * Get the URL where some job progress information will be displayed.
348       * 
349       * @return the URL where some job progress information will be displayed.
350       */
351      public String getTrackingURL(){
352        ensureState(JobState.RUNNING);
353        return status.getTrackingUrl().toString();
354      }
355    
356      /**
357       * Get the path of the submitted job configuration.
358       * 
359       * @return the path of the submitted job configuration.
360       */
361      public String getJobFile() {
362        ensureState(JobState.RUNNING);
363        return status.getJobFile();
364      }
365    
366      /**
367       * Get start time of the job.
368       * 
369       * @return the start time of the job
370       */
371      public long getStartTime() {
372        ensureState(JobState.RUNNING);
373        return status.getStartTime();
374      }
375    
376      /**
377       * Get finish time of the job.
378       * 
379       * @return the finish time of the job
380       */
381      public long getFinishTime() throws IOException, InterruptedException {
382        ensureState(JobState.RUNNING);
383        updateStatus();
384        return status.getFinishTime();
385      }
386    
387      /**
388       * Get scheduling info of the job.
389       * 
390       * @return the scheduling info of the job
391       */
392      public String getSchedulingInfo() {
393        ensureState(JobState.RUNNING);
394        return status.getSchedulingInfo();
395      }
396    
397      /**
398       * Get scheduling info of the job.
399       * 
400       * @return the scheduling info of the job
401       */
402      public JobPriority getPriority() throws IOException, InterruptedException {
403        ensureState(JobState.RUNNING);
404        updateStatus();
405        return status.getPriority();
406      }
407    
408      /**
409       * The user-specified job name.
410       */
411      public String getJobName() {
412        if (state == JobState.DEFINE) {
413          return super.getJobName();
414        }
415        ensureState(JobState.RUNNING);
416        return status.getJobName();
417      }
418    
419      public String getHistoryUrl() throws IOException, InterruptedException {
420        ensureState(JobState.RUNNING);
421        updateStatus();
422        return status.getHistoryFile();
423      }
424    
425      public boolean isRetired() throws IOException, InterruptedException {
426        ensureState(JobState.RUNNING);
427        updateStatus();
428        return status.isRetired();
429      }
430      
431      @Private
432      public Cluster getCluster() {
433        return cluster;
434      }
435    
436      /** Only for mocks in unit tests. */
437      @Private
438      private void setCluster(Cluster cluster) {
439        this.cluster = cluster;
440      }
441    
442      /**
443       * Dump stats to screen.
444       */
445      @Override
446      public String toString() {
447        ensureState(JobState.RUNNING);
448        String reasonforFailure = " ";
449        int numMaps = 0;
450        int numReduces = 0;
451        try {
452          updateStatus();
453          if (status.getState().equals(JobStatus.State.FAILED))
454            reasonforFailure = getTaskFailureEventString();
455          numMaps = getTaskReports(TaskType.MAP).length;
456          numReduces = getTaskReports(TaskType.REDUCE).length;
457        } catch (IOException e) {
458        } catch (InterruptedException ie) {
459        }
460        StringBuffer sb = new StringBuffer();
461        sb.append("Job: ").append(status.getJobID()).append("\n");
462        sb.append("Job File: ").append(status.getJobFile()).append("\n");
463        sb.append("Job Tracking URL : ").append(status.getTrackingUrl());
464        sb.append("\n");
465        sb.append("Uber job : ").append(status.isUber()).append("\n");
466        sb.append("Number of maps: ").append(numMaps).append("\n");
467        sb.append("Number of reduces: ").append(numReduces).append("\n");
468        sb.append("map() completion: ");
469        sb.append(status.getMapProgress()).append("\n");
470        sb.append("reduce() completion: ");
471        sb.append(status.getReduceProgress()).append("\n");
472        sb.append("Job state: ");
473        sb.append(status.getState()).append("\n");
474        sb.append("retired: ").append(status.isRetired()).append("\n");
475        sb.append("reason for failure: ").append(reasonforFailure);
476        return sb.toString();
477      }
478    
479      /**
480       * @return taskid which caused job failure
481       * @throws IOException
482       * @throws InterruptedException
483       */
484      String getTaskFailureEventString() throws IOException,
485          InterruptedException {
486        int failCount = 1;
487        TaskCompletionEvent lastEvent = null;
488        TaskCompletionEvent[] events = ugi.doAs(new 
489            PrivilegedExceptionAction<TaskCompletionEvent[]>() {
490              @Override
491              public TaskCompletionEvent[] run() throws IOException,
492              InterruptedException {
493                return cluster.getClient().getTaskCompletionEvents(
494                    status.getJobID(), 0, 10);
495              }
496            });
497        for (TaskCompletionEvent event : events) {
498          if (event.getStatus().equals(TaskCompletionEvent.Status.FAILED)) {
499            failCount++;
500            lastEvent = event;
501          }
502        }
503        if (lastEvent == null) {
504          return "There are no failed tasks for the job. "
505              + "Job is failed due to some other reason and reason "
506              + "can be found in the logs.";
507        }
508        String[] taskAttemptID = lastEvent.getTaskAttemptId().toString().split("_", 2);
509        String taskID = taskAttemptID[1].substring(0, taskAttemptID[1].length()-2);
510        return (" task " + taskID + " failed " +
511          failCount + " times " + "For details check tasktracker at: " +
512          lastEvent.getTaskTrackerHttp());
513      }
514    
515      /**
516       * Get the information of the current state of the tasks of a job.
517       * 
518       * @param type Type of the task
519       * @return the list of all of the map tips.
520       * @throws IOException
521       */
522      public TaskReport[] getTaskReports(TaskType type) 
523          throws IOException, InterruptedException {
524        ensureState(JobState.RUNNING);
525        final TaskType tmpType = type;
526        return ugi.doAs(new PrivilegedExceptionAction<TaskReport[]>() {
527          public TaskReport[] run() throws IOException, InterruptedException {
528            return cluster.getClient().getTaskReports(getJobID(), tmpType);
529          }
530        });
531      }
532    
533      /**
534       * Get the <i>progress</i> of the job's map-tasks, as a float between 0.0 
535       * and 1.0.  When all map tasks have completed, the function returns 1.0.
536       * 
537       * @return the progress of the job's map-tasks.
538       * @throws IOException
539       */
540      public float mapProgress() throws IOException, InterruptedException {
541        ensureState(JobState.RUNNING);
542        ensureFreshStatus();
543        return status.getMapProgress();
544      }
545    
546      /**
547       * Get the <i>progress</i> of the job's reduce-tasks, as a float between 0.0 
548       * and 1.0.  When all reduce tasks have completed, the function returns 1.0.
549       * 
550       * @return the progress of the job's reduce-tasks.
551       * @throws IOException
552       */
553      public float reduceProgress() throws IOException, InterruptedException {
554        ensureState(JobState.RUNNING);
555        ensureFreshStatus();
556        return status.getReduceProgress();
557      }
558    
559      /**
560       * Get the <i>progress</i> of the job's cleanup-tasks, as a float between 0.0 
561       * and 1.0.  When all cleanup tasks have completed, the function returns 1.0.
562       * 
563       * @return the progress of the job's cleanup-tasks.
564       * @throws IOException
565       */
566      public float cleanupProgress() throws IOException, InterruptedException {
567        ensureState(JobState.RUNNING);
568        ensureFreshStatus();
569        return status.getCleanupProgress();
570      }
571    
572      /**
573       * Get the <i>progress</i> of the job's setup-tasks, as a float between 0.0 
574       * and 1.0.  When all setup tasks have completed, the function returns 1.0.
575       * 
576       * @return the progress of the job's setup-tasks.
577       * @throws IOException
578       */
579      public float setupProgress() throws IOException, InterruptedException {
580        ensureState(JobState.RUNNING);
581        ensureFreshStatus();
582        return status.getSetupProgress();
583      }
584    
585      /**
586       * Check if the job is finished or not. 
587       * This is a non-blocking call.
588       * 
589       * @return <code>true</code> if the job is complete, else <code>false</code>.
590       * @throws IOException
591       */
592      public boolean isComplete() throws IOException, InterruptedException {
593        ensureState(JobState.RUNNING);
594        updateStatus();
595        return status.isJobComplete();
596      }
597    
598      /**
599       * Check if the job completed successfully. 
600       * 
601       * @return <code>true</code> if the job succeeded, else <code>false</code>.
602       * @throws IOException
603       */
604      public boolean isSuccessful() throws IOException, InterruptedException {
605        ensureState(JobState.RUNNING);
606        updateStatus();
607        return status.getState() == JobStatus.State.SUCCEEDED;
608      }
609    
610      /**
611       * Kill the running job.  Blocks until all job tasks have been
612       * killed as well.  If the job is no longer running, it simply returns.
613       * 
614       * @throws IOException
615       */
616      public void killJob() throws IOException, InterruptedException {
617        ensureState(JobState.RUNNING);
618        cluster.getClient().killJob(getJobID());
619      }
620    
621      /**
622       * Set the priority of a running job.
623       * @param priority the new priority for the job.
624       * @throws IOException
625       */
626      public void setPriority(JobPriority priority) 
627          throws IOException, InterruptedException {
628        if (state == JobState.DEFINE) {
629          conf.setJobPriority(
630            org.apache.hadoop.mapred.JobPriority.valueOf(priority.name()));
631        } else {
632          ensureState(JobState.RUNNING);
633          final JobPriority tmpPriority = priority;
634          ugi.doAs(new PrivilegedExceptionAction<Object>() {
635            @Override
636            public Object run() throws IOException, InterruptedException {
637              cluster.getClient().setJobPriority(getJobID(), tmpPriority.toString());
638              return null;
639            }
640          });
641        }
642      }
643    
644      /**
645       * Get events indicating completion (success/failure) of component tasks.
646       *  
647       * @param startFrom index to start fetching events from
648       * @param numEvents number of events to fetch
649       * @return an array of {@link TaskCompletionEvent}s
650       * @throws IOException
651       */
652      public TaskCompletionEvent[] getTaskCompletionEvents(final int startFrom,
653          final int numEvents) throws IOException, InterruptedException {
654        ensureState(JobState.RUNNING);
655        return ugi.doAs(new PrivilegedExceptionAction<TaskCompletionEvent[]>() {
656          @Override
657          public TaskCompletionEvent[] run() throws IOException, InterruptedException {
658            return cluster.getClient().getTaskCompletionEvents(getJobID(),
659                startFrom, numEvents); 
660          }
661        });
662        }
663      
664      /**
665       * Kill indicated task attempt.
666       * 
667       * @param taskId the id of the task to be terminated.
668       * @throws IOException
669       */
670      public boolean killTask(final TaskAttemptID taskId) 
671          throws IOException, InterruptedException {
672        ensureState(JobState.RUNNING);
673        return ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
674          public Boolean run() throws IOException, InterruptedException {
675            return cluster.getClient().killTask(taskId, false);
676          }
677        });
678      }
679    
680      /**
681       * Fail indicated task attempt.
682       * 
683       * @param taskId the id of the task to be terminated.
684       * @throws IOException
685       */
686      public boolean failTask(final TaskAttemptID taskId) 
687          throws IOException, InterruptedException {
688        ensureState(JobState.RUNNING);
689        return ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
690          @Override
691          public Boolean run() throws IOException, InterruptedException {
692            return cluster.getClient().killTask(taskId, true);
693          }
694        });
695      }
696    
697      /**
698       * Gets the counters for this job. May return null if the job has been
699       * retired and the job is no longer in the completed job store.
700       * 
701       * @return the counters for this job.
702       * @throws IOException
703       */
704      public Counters getCounters() 
705          throws IOException, InterruptedException {
706        ensureState(JobState.RUNNING);
707        return ugi.doAs(new PrivilegedExceptionAction<Counters>() {
708          @Override
709          public Counters run() throws IOException, InterruptedException {
710            return cluster.getClient().getJobCounters(getJobID());
711          }
712        });
713      }
714    
715      /**
716       * Gets the diagnostic messages for a given task attempt.
717       * @param taskid
718       * @return the list of diagnostic messages for the task
719       * @throws IOException
720       */
721      public String[] getTaskDiagnostics(final TaskAttemptID taskid) 
722          throws IOException, InterruptedException {
723        ensureState(JobState.RUNNING);
724        return ugi.doAs(new PrivilegedExceptionAction<String[]>() {
725          @Override
726          public String[] run() throws IOException, InterruptedException {
727            return cluster.getClient().getTaskDiagnostics(taskid);
728          }
729        });
730      }
731    
732      /**
733       * Set the number of reduce tasks for the job.
734       * @param tasks the number of reduce tasks
735       * @throws IllegalStateException if the job is submitted
736       */
737      public void setNumReduceTasks(int tasks) throws IllegalStateException {
738        ensureState(JobState.DEFINE);
739        conf.setNumReduceTasks(tasks);
740      }
741    
742      /**
743       * Set the current working directory for the default file system.
744       * 
745       * @param dir the new current working directory.
746       * @throws IllegalStateException if the job is submitted
747       */
748      public void setWorkingDirectory(Path dir) throws IOException {
749        ensureState(JobState.DEFINE);
750        conf.setWorkingDirectory(dir);
751      }
752    
753      /**
754       * Set the {@link InputFormat} for the job.
755       * @param cls the <code>InputFormat</code> to use
756       * @throws IllegalStateException if the job is submitted
757       */
758      public void setInputFormatClass(Class<? extends InputFormat> cls
759                                      ) throws IllegalStateException {
760        ensureState(JobState.DEFINE);
761        conf.setClass(INPUT_FORMAT_CLASS_ATTR, cls, 
762                      InputFormat.class);
763      }
764    
765      /**
766       * Set the {@link OutputFormat} for the job.
767       * @param cls the <code>OutputFormat</code> to use
768       * @throws IllegalStateException if the job is submitted
769       */
770      public void setOutputFormatClass(Class<? extends OutputFormat> cls
771                                       ) throws IllegalStateException {
772        ensureState(JobState.DEFINE);
773        conf.setClass(OUTPUT_FORMAT_CLASS_ATTR, cls, 
774                      OutputFormat.class);
775      }
776    
777      /**
778       * Set the {@link Mapper} for the job.
779       * @param cls the <code>Mapper</code> to use
780       * @throws IllegalStateException if the job is submitted
781       */
782      public void setMapperClass(Class<? extends Mapper> cls
783                                 ) throws IllegalStateException {
784        ensureState(JobState.DEFINE);
785        conf.setClass(MAP_CLASS_ATTR, cls, Mapper.class);
786      }
787    
788      /**
789       * Set the Jar by finding where a given class came from.
790       * @param cls the example class
791       */
792      public void setJarByClass(Class<?> cls) {
793        ensureState(JobState.DEFINE);
794        conf.setJarByClass(cls);
795      }
796    
797      /**
798       * Set the job jar 
799       */
800      public void setJar(String jar) {
801        ensureState(JobState.DEFINE);
802        conf.setJar(jar);
803      }
804    
805      /**
806       * Set the reported username for this job.
807       * 
808       * @param user the username for this job.
809       */
810      public void setUser(String user) {
811        ensureState(JobState.DEFINE);
812        conf.setUser(user);
813      }
814    
815      /**
816       * Set the combiner class for the job.
817       * @param cls the combiner to use
818       * @throws IllegalStateException if the job is submitted
819       */
820      public void setCombinerClass(Class<? extends Reducer> cls
821                                   ) throws IllegalStateException {
822        ensureState(JobState.DEFINE);
823        conf.setClass(COMBINE_CLASS_ATTR, cls, Reducer.class);
824      }
825    
826      /**
827       * Set the {@link Reducer} for the job.
828       * @param cls the <code>Reducer</code> to use
829       * @throws IllegalStateException if the job is submitted
830       */
831      public void setReducerClass(Class<? extends Reducer> cls
832                                  ) throws IllegalStateException {
833        ensureState(JobState.DEFINE);
834        conf.setClass(REDUCE_CLASS_ATTR, cls, Reducer.class);
835      }
836    
837      /**
838       * Set the {@link Partitioner} for the job.
839       * @param cls the <code>Partitioner</code> to use
840       * @throws IllegalStateException if the job is submitted
841       */
842      public void setPartitionerClass(Class<? extends Partitioner> cls
843                                      ) throws IllegalStateException {
844        ensureState(JobState.DEFINE);
845        conf.setClass(PARTITIONER_CLASS_ATTR, cls, 
846                      Partitioner.class);
847      }
848    
849      /**
850       * Set the key class for the map output data. This allows the user to
851       * specify the map output key class to be different than the final output
852       * value class.
853       * 
854       * @param theClass the map output key class.
855       * @throws IllegalStateException if the job is submitted
856       */
857      public void setMapOutputKeyClass(Class<?> theClass
858                                       ) throws IllegalStateException {
859        ensureState(JobState.DEFINE);
860        conf.setMapOutputKeyClass(theClass);
861      }
862    
863      /**
864       * Set the value class for the map output data. This allows the user to
865       * specify the map output value class to be different than the final output
866       * value class.
867       * 
868       * @param theClass the map output value class.
869       * @throws IllegalStateException if the job is submitted
870       */
871      public void setMapOutputValueClass(Class<?> theClass
872                                         ) throws IllegalStateException {
873        ensureState(JobState.DEFINE);
874        conf.setMapOutputValueClass(theClass);
875      }
876    
877      /**
878       * Set the key class for the job output data.
879       * 
880       * @param theClass the key class for the job output data.
881       * @throws IllegalStateException if the job is submitted
882       */
883      public void setOutputKeyClass(Class<?> theClass
884                                    ) throws IllegalStateException {
885        ensureState(JobState.DEFINE);
886        conf.setOutputKeyClass(theClass);
887      }
888    
889      /**
890       * Set the value class for job outputs.
891       * 
892       * @param theClass the value class for job outputs.
893       * @throws IllegalStateException if the job is submitted
894       */
895      public void setOutputValueClass(Class<?> theClass
896                                      ) throws IllegalStateException {
897        ensureState(JobState.DEFINE);
898        conf.setOutputValueClass(theClass);
899      }
900    
901      /**
902       * Define the comparator that controls how the keys are sorted before they
903       * are passed to the {@link Reducer}.
904       * @param cls the raw comparator
905       * @throws IllegalStateException if the job is submitted
906       */
907      public void setSortComparatorClass(Class<? extends RawComparator> cls
908                                         ) throws IllegalStateException {
909        ensureState(JobState.DEFINE);
910        conf.setOutputKeyComparatorClass(cls);
911      }
912    
913      /**
914       * Define the comparator that controls which keys are grouped together
915       * for a single call to 
916       * {@link Reducer#reduce(Object, Iterable, 
917       *                       org.apache.hadoop.mapreduce.Reducer.Context)}
918       * @param cls the raw comparator to use
919       * @throws IllegalStateException if the job is submitted
920       */
921      public void setGroupingComparatorClass(Class<? extends RawComparator> cls
922                                             ) throws IllegalStateException {
923        ensureState(JobState.DEFINE);
924        conf.setOutputValueGroupingComparator(cls);
925      }
926    
927      /**
928       * Set the user-specified job name.
929       * 
930       * @param name the job's new name.
931       * @throws IllegalStateException if the job is submitted
932       */
933      public void setJobName(String name) throws IllegalStateException {
934        ensureState(JobState.DEFINE);
935        conf.setJobName(name);
936      }
937    
938      /**
939       * Turn speculative execution on or off for this job. 
940       * 
941       * @param speculativeExecution <code>true</code> if speculative execution 
942       *                             should be turned on, else <code>false</code>.
943       */
944      public void setSpeculativeExecution(boolean speculativeExecution) {
945        ensureState(JobState.DEFINE);
946        conf.setSpeculativeExecution(speculativeExecution);
947      }
948    
949      /**
950       * Turn speculative execution on or off for this job for map tasks. 
951       * 
952       * @param speculativeExecution <code>true</code> if speculative execution 
953       *                             should be turned on for map tasks,
954       *                             else <code>false</code>.
955       */
956      public void setMapSpeculativeExecution(boolean speculativeExecution) {
957        ensureState(JobState.DEFINE);
958        conf.setMapSpeculativeExecution(speculativeExecution);
959      }
960    
961      /**
962       * Turn speculative execution on or off for this job for reduce tasks. 
963       * 
964       * @param speculativeExecution <code>true</code> if speculative execution 
965       *                             should be turned on for reduce tasks,
966       *                             else <code>false</code>.
967       */
968      public void setReduceSpeculativeExecution(boolean speculativeExecution) {
969        ensureState(JobState.DEFINE);
970        conf.setReduceSpeculativeExecution(speculativeExecution);
971      }
972    
973      /**
974       * Specify whether job-setup and job-cleanup is needed for the job 
975       * 
976       * @param needed If <code>true</code>, job-setup and job-cleanup will be
977       *               considered from {@link OutputCommitter} 
978       *               else ignored.
979       */
980      public void setJobSetupCleanupNeeded(boolean needed) {
981        ensureState(JobState.DEFINE);
982        conf.setBoolean(SETUP_CLEANUP_NEEDED, needed);
983      }
984    
985      /**
986       * Set the given set of archives
987       * @param archives The list of archives that need to be localized
988       */
989      public void setCacheArchives(URI[] archives) {
990        ensureState(JobState.DEFINE);
991        DistributedCache.setCacheArchives(archives, conf);
992      }
993    
994      /**
995       * Set the given set of files
996       * @param files The list of files that need to be localized
997       */
998      public void setCacheFiles(URI[] files) {
999        ensureState(JobState.DEFINE);
1000        DistributedCache.setCacheFiles(files, conf);
1001      }
1002    
1003      /**
1004       * Add a archives to be localized
1005       * @param uri The uri of the cache to be localized
1006       */
1007      public void addCacheArchive(URI uri) {
1008        ensureState(JobState.DEFINE);
1009        DistributedCache.addCacheArchive(uri, conf);
1010      }
1011      
1012      /**
1013       * Add a file to be localized
1014       * @param uri The uri of the cache to be localized
1015       */
1016      public void addCacheFile(URI uri) {
1017        ensureState(JobState.DEFINE);
1018        DistributedCache.addCacheFile(uri, conf);
1019      }
1020    
1021      /**
1022       * Add an file path to the current set of classpath entries It adds the file
1023       * to cache as well.
1024       * 
1025       * Files added with this method will not be unpacked while being added to the
1026       * classpath.
1027       * To add archives to classpath, use the {@link #addArchiveToClassPath(Path)}
1028       * method instead.
1029       *
1030       * @param file Path of the file to be added
1031       */
1032      public void addFileToClassPath(Path file)
1033        throws IOException {
1034        ensureState(JobState.DEFINE);
1035        DistributedCache.addFileToClassPath(file, conf, file.getFileSystem(conf));
1036      }
1037    
1038      /**
1039       * Add an archive path to the current set of classpath entries. It adds the
1040       * archive to cache as well.
1041       * 
1042       * Archive files will be unpacked and added to the classpath
1043       * when being distributed.
1044       *
1045       * @param archive Path of the archive to be added
1046       */
1047      public void addArchiveToClassPath(Path archive)
1048        throws IOException {
1049        ensureState(JobState.DEFINE);
1050        DistributedCache.addArchiveToClassPath(archive, conf, archive.getFileSystem(conf));
1051      }
1052    
1053      /**
1054       * Originally intended to enable symlinks, but currently symlinks cannot be
1055       * disabled.
1056       */
1057      @Deprecated
1058      public void createSymlink() {
1059        ensureState(JobState.DEFINE);
1060        DistributedCache.createSymlink(conf);
1061      }
1062      
1063      /** 
1064       * Expert: Set the number of maximum attempts that will be made to run a
1065       * map task.
1066       * 
1067       * @param n the number of attempts per map task.
1068       */
1069      public void setMaxMapAttempts(int n) {
1070        ensureState(JobState.DEFINE);
1071        conf.setMaxMapAttempts(n);
1072      }
1073    
1074      /** 
1075       * Expert: Set the number of maximum attempts that will be made to run a
1076       * reduce task.
1077       * 
1078       * @param n the number of attempts per reduce task.
1079       */
1080      public void setMaxReduceAttempts(int n) {
1081        ensureState(JobState.DEFINE);
1082        conf.setMaxReduceAttempts(n);
1083      }
1084    
1085      /**
1086       * Set whether the system should collect profiler information for some of 
1087       * the tasks in this job? The information is stored in the user log 
1088       * directory.
1089       * @param newValue true means it should be gathered
1090       */
1091      public void setProfileEnabled(boolean newValue) {
1092        ensureState(JobState.DEFINE);
1093        conf.setProfileEnabled(newValue);
1094      }
1095    
1096      /**
1097       * Set the profiler configuration arguments. If the string contains a '%s' it
1098       * will be replaced with the name of the profiling output file when the task
1099       * runs.
1100       *
1101       * This value is passed to the task child JVM on the command line.
1102       *
1103       * @param value the configuration string
1104       */
1105      public void setProfileParams(String value) {
1106        ensureState(JobState.DEFINE);
1107        conf.setProfileParams(value);
1108      }
1109    
1110      /**
1111       * Set the ranges of maps or reduces to profile. setProfileEnabled(true) 
1112       * must also be called.
1113       * @param newValue a set of integer ranges of the map ids
1114       */
1115      public void setProfileTaskRange(boolean isMap, String newValue) {
1116        ensureState(JobState.DEFINE);
1117        conf.setProfileTaskRange(isMap, newValue);
1118      }
1119    
1120      private void ensureNotSet(String attr, String msg) throws IOException {
1121        if (conf.get(attr) != null) {
1122          throw new IOException(attr + " is incompatible with " + msg + " mode.");
1123        }    
1124      }
1125      
1126      /**
1127       * Sets the flag that will allow the JobTracker to cancel the HDFS delegation
1128       * tokens upon job completion. Defaults to true.
1129       */
1130      public void setCancelDelegationTokenUponJobCompletion(boolean value) {
1131        ensureState(JobState.DEFINE);
1132        conf.setBoolean(JOB_CANCEL_DELEGATION_TOKEN, value);
1133      }
1134    
1135      /**
1136       * Default to the new APIs unless they are explicitly set or the old mapper or
1137       * reduce attributes are used.
1138       * @throws IOException if the configuration is inconsistant
1139       */
1140      private void setUseNewAPI() throws IOException {
1141        int numReduces = conf.getNumReduceTasks();
1142        String oldMapperClass = "mapred.mapper.class";
1143        String oldReduceClass = "mapred.reducer.class";
1144        conf.setBooleanIfUnset("mapred.mapper.new-api",
1145                               conf.get(oldMapperClass) == null);
1146        if (conf.getUseNewMapper()) {
1147          String mode = "new map API";
1148          ensureNotSet("mapred.input.format.class", mode);
1149          ensureNotSet(oldMapperClass, mode);
1150          if (numReduces != 0) {
1151            ensureNotSet("mapred.partitioner.class", mode);
1152           } else {
1153            ensureNotSet("mapred.output.format.class", mode);
1154          }      
1155        } else {
1156          String mode = "map compatability";
1157          ensureNotSet(INPUT_FORMAT_CLASS_ATTR, mode);
1158          ensureNotSet(MAP_CLASS_ATTR, mode);
1159          if (numReduces != 0) {
1160            ensureNotSet(PARTITIONER_CLASS_ATTR, mode);
1161           } else {
1162            ensureNotSet(OUTPUT_FORMAT_CLASS_ATTR, mode);
1163          }
1164        }
1165        if (numReduces != 0) {
1166          conf.setBooleanIfUnset("mapred.reducer.new-api",
1167                                 conf.get(oldReduceClass) == null);
1168          if (conf.getUseNewReducer()) {
1169            String mode = "new reduce API";
1170            ensureNotSet("mapred.output.format.class", mode);
1171            ensureNotSet(oldReduceClass, mode);   
1172          } else {
1173            String mode = "reduce compatability";
1174            ensureNotSet(OUTPUT_FORMAT_CLASS_ATTR, mode);
1175            ensureNotSet(REDUCE_CLASS_ATTR, mode);   
1176          }
1177        }   
1178      }
1179    
1180      private synchronized void connect()
1181              throws IOException, InterruptedException, ClassNotFoundException {
1182        if (cluster == null) {
1183          cluster = 
1184            ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
1185                       public Cluster run()
1186                              throws IOException, InterruptedException, 
1187                                     ClassNotFoundException {
1188                         return new Cluster(getConfiguration());
1189                       }
1190                     });
1191        }
1192      }
1193    
1194      boolean isConnected() {
1195        return cluster != null;
1196      }
1197    
1198      /** Only for mocking via unit tests. */
1199      @Private
1200      public JobSubmitter getJobSubmitter(FileSystem fs, 
1201          ClientProtocol submitClient) throws IOException {
1202        return new JobSubmitter(fs, submitClient);
1203      }
1204      /**
1205       * Submit the job to the cluster and return immediately.
1206       * @throws IOException
1207       */
1208      public void submit() 
1209             throws IOException, InterruptedException, ClassNotFoundException {
1210        ensureState(JobState.DEFINE);
1211        setUseNewAPI();
1212        connect();
1213        final JobSubmitter submitter = 
1214            getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
1215        status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
1216          public JobStatus run() throws IOException, InterruptedException, 
1217          ClassNotFoundException {
1218            return submitter.submitJobInternal(Job.this, cluster);
1219          }
1220        });
1221        state = JobState.RUNNING;
1222        LOG.info("The url to track the job: " + getTrackingURL());
1223       }
1224      
1225      /**
1226       * Submit the job to the cluster and wait for it to finish.
1227       * @param verbose print the progress to the user
1228       * @return true if the job succeeded
1229       * @throws IOException thrown if the communication with the 
1230       *         <code>JobTracker</code> is lost
1231       */
1232      public boolean waitForCompletion(boolean verbose
1233                                       ) throws IOException, InterruptedException,
1234                                                ClassNotFoundException {
1235        if (state == JobState.DEFINE) {
1236          submit();
1237        }
1238        if (verbose) {
1239          monitorAndPrintJob();
1240        } else {
1241          // get the completion poll interval from the client.
1242          int completionPollIntervalMillis = 
1243            Job.getCompletionPollInterval(cluster.getConf());
1244          while (!isComplete()) {
1245            try {
1246              Thread.sleep(completionPollIntervalMillis);
1247            } catch (InterruptedException ie) {
1248            }
1249          }
1250        }
1251        return isSuccessful();
1252      }
1253      
1254      /**
1255       * Monitor a job and print status in real-time as progress is made and tasks 
1256       * fail.
1257       * @return true if the job succeeded
1258       * @throws IOException if communication to the JobTracker fails
1259       */
1260      public boolean monitorAndPrintJob() 
1261          throws IOException, InterruptedException {
1262        String lastReport = null;
1263        Job.TaskStatusFilter filter;
1264        Configuration clientConf = getConfiguration();
1265        filter = Job.getTaskOutputFilter(clientConf);
1266        JobID jobId = getJobID();
1267        LOG.info("Running job: " + jobId);
1268        int eventCounter = 0;
1269        boolean profiling = getProfileEnabled();
1270        IntegerRanges mapRanges = getProfileTaskRange(true);
1271        IntegerRanges reduceRanges = getProfileTaskRange(false);
1272        int progMonitorPollIntervalMillis = 
1273          Job.getProgressPollInterval(clientConf);
1274        /* make sure to report full progress after the job is done */
1275        boolean reportedAfterCompletion = false;
1276        boolean reportedUberMode = false;
1277        while (!isComplete() || !reportedAfterCompletion) {
1278          if (isComplete()) {
1279            reportedAfterCompletion = true;
1280          } else {
1281            Thread.sleep(progMonitorPollIntervalMillis);
1282          }
1283          if (status.getState() == JobStatus.State.PREP) {
1284            continue;
1285          }      
1286          if (!reportedUberMode) {
1287            reportedUberMode = true;
1288            LOG.info("Job " + jobId + " running in uber mode : " + isUber());
1289          }      
1290          String report = 
1291            (" map " + StringUtils.formatPercent(mapProgress(), 0)+
1292                " reduce " + 
1293                StringUtils.formatPercent(reduceProgress(), 0));
1294          if (!report.equals(lastReport)) {
1295            LOG.info(report);
1296            lastReport = report;
1297          }
1298    
1299          TaskCompletionEvent[] events = 
1300            getTaskCompletionEvents(eventCounter, 10); 
1301          eventCounter += events.length;
1302          printTaskEvents(events, filter, profiling, mapRanges, reduceRanges);
1303        }
1304        boolean success = isSuccessful();
1305        if (success) {
1306          LOG.info("Job " + jobId + " completed successfully");
1307        } else {
1308          LOG.info("Job " + jobId + " failed with state " + status.getState() + 
1309              " due to: " + status.getFailureInfo());
1310        }
1311        Counters counters = getCounters();
1312        if (counters != null) {
1313          LOG.info(counters.toString());
1314        }
1315        return success;
1316      }
1317    
1318      /**
1319       * @return true if the profile parameters indicate that this is using
1320       * hprof, which generates profile files in a particular location
1321       * that we can retrieve to the client.
1322       */
1323      private boolean shouldDownloadProfile() {
1324        // Check the argument string that was used to initialize profiling.
1325        // If this indicates hprof and file-based output, then we're ok to
1326        // download.
1327        String profileParams = getProfileParams();
1328    
1329        if (null == profileParams) {
1330          return false;
1331        }
1332    
1333        // Split this on whitespace.
1334        String [] parts = profileParams.split("[ \\t]+");
1335    
1336        // If any of these indicate hprof, and the use of output files, return true.
1337        boolean hprofFound = false;
1338        boolean fileFound = false;
1339        for (String p : parts) {
1340          if (p.startsWith("-agentlib:hprof") || p.startsWith("-Xrunhprof")) {
1341            hprofFound = true;
1342    
1343            // This contains a number of comma-delimited components, one of which
1344            // may specify the file to write to. Make sure this is present and
1345            // not empty.
1346            String [] subparts = p.split(",");
1347            for (String sub : subparts) {
1348              if (sub.startsWith("file=") && sub.length() != "file=".length()) {
1349                fileFound = true;
1350              }
1351            }
1352          }
1353        }
1354    
1355        return hprofFound && fileFound;
1356      }
1357    
1358      private void printTaskEvents(TaskCompletionEvent[] events,
1359          Job.TaskStatusFilter filter, boolean profiling, IntegerRanges mapRanges,
1360          IntegerRanges reduceRanges) throws IOException, InterruptedException {
1361        for (TaskCompletionEvent event : events) {
1362          switch (filter) {
1363          case NONE:
1364            break;
1365          case SUCCEEDED:
1366            if (event.getStatus() == 
1367              TaskCompletionEvent.Status.SUCCEEDED) {
1368              LOG.info(event.toString());
1369            }
1370            break; 
1371          case FAILED:
1372            if (event.getStatus() == 
1373              TaskCompletionEvent.Status.FAILED) {
1374              LOG.info(event.toString());
1375              // Displaying the task diagnostic information
1376              TaskAttemptID taskId = event.getTaskAttemptId();
1377              String[] taskDiagnostics = getTaskDiagnostics(taskId); 
1378              if (taskDiagnostics != null) {
1379                for (String diagnostics : taskDiagnostics) {
1380                  System.err.println(diagnostics);
1381                }
1382              }
1383            }
1384            break; 
1385          case KILLED:
1386            if (event.getStatus() == TaskCompletionEvent.Status.KILLED){
1387              LOG.info(event.toString());
1388            }
1389            break; 
1390          case ALL:
1391            LOG.info(event.toString());
1392            break;
1393          }
1394        }
1395      }
1396    
1397      /** The interval at which monitorAndPrintJob() prints status */
1398      public static int getProgressPollInterval(Configuration conf) {
1399        // Read progress monitor poll interval from config. Default is 1 second.
1400        int progMonitorPollIntervalMillis = conf.getInt(
1401          PROGRESS_MONITOR_POLL_INTERVAL_KEY, DEFAULT_MONITOR_POLL_INTERVAL);
1402        if (progMonitorPollIntervalMillis < 1) {
1403          LOG.warn(PROGRESS_MONITOR_POLL_INTERVAL_KEY + 
1404            " has been set to an invalid value; "
1405            + " replacing with " + DEFAULT_MONITOR_POLL_INTERVAL);
1406          progMonitorPollIntervalMillis = DEFAULT_MONITOR_POLL_INTERVAL;
1407        }
1408        return progMonitorPollIntervalMillis;
1409      }
1410    
1411      /** The interval at which waitForCompletion() should check. */
1412      public static int getCompletionPollInterval(Configuration conf) {
1413        int completionPollIntervalMillis = conf.getInt(
1414          COMPLETION_POLL_INTERVAL_KEY, DEFAULT_COMPLETION_POLL_INTERVAL);
1415        if (completionPollIntervalMillis < 1) { 
1416          LOG.warn(COMPLETION_POLL_INTERVAL_KEY + 
1417           " has been set to an invalid value; "
1418           + "replacing with " + DEFAULT_COMPLETION_POLL_INTERVAL);
1419          completionPollIntervalMillis = DEFAULT_COMPLETION_POLL_INTERVAL;
1420        }
1421        return completionPollIntervalMillis;
1422      }
1423    
1424      /**
1425       * Get the task output filter.
1426       * 
1427       * @param conf the configuration.
1428       * @return the filter level.
1429       */
1430      public static TaskStatusFilter getTaskOutputFilter(Configuration conf) {
1431        return TaskStatusFilter.valueOf(conf.get(Job.OUTPUT_FILTER, "FAILED"));
1432      }
1433    
1434      /**
1435       * Modify the Configuration to set the task output filter.
1436       * 
1437       * @param conf the Configuration to modify.
1438       * @param newValue the value to set.
1439       */
1440      public static void setTaskOutputFilter(Configuration conf, 
1441          TaskStatusFilter newValue) {
1442        conf.set(Job.OUTPUT_FILTER, newValue.toString());
1443      }
1444    
1445      public boolean isUber() throws IOException, InterruptedException {
1446        ensureState(JobState.RUNNING);
1447        updateStatus();
1448        return status.isUber();
1449      }
1450      
1451    }