001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.hadoop.mapred;
019    
020    import java.io.FileNotFoundException;
021    import java.io.IOException;
022    import java.net.InetSocketAddress;
023    import java.net.URL;
024    import java.security.PrivilegedExceptionAction;
025    import java.util.ArrayList;
026    import java.util.Collection;
027    import java.util.List;
028    
029    import org.apache.hadoop.classification.InterfaceAudience;
030    import org.apache.hadoop.classification.InterfaceStability;
031    import org.apache.hadoop.conf.Configuration;
032    import org.apache.hadoop.fs.FileSystem;
033    import org.apache.hadoop.fs.Path;
034    import org.apache.hadoop.io.Text;
035    import org.apache.hadoop.mapred.ClusterStatus.BlackListInfo;
036    import org.apache.hadoop.mapreduce.Cluster;
037    import org.apache.hadoop.mapreduce.ClusterMetrics;
038    import org.apache.hadoop.mapreduce.Job;
039    import org.apache.hadoop.mapreduce.QueueInfo;
040    import org.apache.hadoop.mapreduce.TaskTrackerInfo;
041    import org.apache.hadoop.mapreduce.TaskType;
042    import org.apache.hadoop.mapreduce.filecache.DistributedCache;
043    import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
044    import org.apache.hadoop.mapreduce.tools.CLI;
045    import org.apache.hadoop.mapreduce.util.ConfigUtil;
046    import org.apache.hadoop.security.UserGroupInformation;
047    import org.apache.hadoop.security.token.SecretManager.InvalidToken;
048    import org.apache.hadoop.security.token.Token;
049    import org.apache.hadoop.security.token.TokenRenewer;
050    import org.apache.hadoop.util.Tool;
051    import org.apache.hadoop.util.ToolRunner;
052    
053    /**
054     * <code>JobClient</code> is the primary interface for the user-job to interact
055     * with the cluster.
056     * 
057     * <code>JobClient</code> provides facilities to submit jobs, track their 
058     * progress, access component-tasks' reports/logs, get the Map-Reduce cluster
059     * status information etc.
060     * 
061     * <p>The job submission process involves:
062     * <ol>
063     *   <li>
064     *   Checking the input and output specifications of the job.
065     *   </li>
066     *   <li>
067     *   Computing the {@link InputSplit}s for the job.
068     *   </li>
069     *   <li>
070     *   Setup the requisite accounting information for the {@link DistributedCache} 
071     *   of the job, if necessary.
072     *   </li>
073     *   <li>
074     *   Copying the job's jar and configuration to the map-reduce system directory 
075     *   on the distributed file-system. 
076     *   </li>
077     *   <li>
078     *   Submitting the job to the cluster and optionally monitoring
079     *   it's status.
080     *   </li>
081     * </ol></p>
082     *  
083     * Normally the user creates the application, describes various facets of the
084     * job via {@link JobConf} and then uses the <code>JobClient</code> to submit 
085     * the job and monitor its progress.
086     * 
087     * <p>Here is an example on how to use <code>JobClient</code>:</p>
088     * <p><blockquote><pre>
089     *     // Create a new JobConf
090     *     JobConf job = new JobConf(new Configuration(), MyJob.class);
091     *     
092     *     // Specify various job-specific parameters     
093     *     job.setJobName("myjob");
094     *     
095     *     job.setInputPath(new Path("in"));
096     *     job.setOutputPath(new Path("out"));
097     *     
098     *     job.setMapperClass(MyJob.MyMapper.class);
099     *     job.setReducerClass(MyJob.MyReducer.class);
100     *
101     *     // Submit the job, then poll for progress until the job is complete
102     *     JobClient.runJob(job);
103     * </pre></blockquote></p>
104     * 
105     * <h4 id="JobControl">Job Control</h4>
106     * 
107     * <p>At times clients would chain map-reduce jobs to accomplish complex tasks 
108     * which cannot be done via a single map-reduce job. This is fairly easy since 
109     * the output of the job, typically, goes to distributed file-system and that 
110     * can be used as the input for the next job.</p>
111     * 
112     * <p>However, this also means that the onus on ensuring jobs are complete 
113     * (success/failure) lies squarely on the clients. In such situations the 
114     * various job-control options are:
115     * <ol>
116     *   <li>
117     *   {@link #runJob(JobConf)} : submits the job and returns only after 
118     *   the job has completed.
119     *   </li>
120     *   <li>
121     *   {@link #submitJob(JobConf)} : only submits the job, then poll the 
122     *   returned handle to the {@link RunningJob} to query status and make 
123     *   scheduling decisions.
124     *   </li>
125     *   <li>
126     *   {@link JobConf#setJobEndNotificationURI(String)} : setup a notification
127     *   on job-completion, thus avoiding polling.
128     *   </li>
129     * </ol></p>
130     * 
131     * @see JobConf
132     * @see ClusterStatus
133     * @see Tool
134     * @see DistributedCache
135     */
136    @InterfaceAudience.Public
137    @InterfaceStability.Stable
138    public class JobClient extends CLI {
139      public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }
140      private TaskStatusFilter taskOutputFilter = TaskStatusFilter.FAILED; 
141      /* notes that get delegation token was called. Again this is hack for oozie 
142       * to make sure we add history server delegation tokens to the credentials
143       *  for the job. Since the api only allows one delegation token to be returned, 
144       *  we have to add this hack.
145       */
146      private boolean getDelegationTokenCalled = false;
147      /* do we need a HS delegation token for this client */
148      static final String HS_DELEGATION_TOKEN_REQUIRED 
149          = "mapreduce.history.server.delegationtoken.required";
150      
151      static{
152        ConfigUtil.loadResources();
153      }
154    
155      /**
156       * A NetworkedJob is an implementation of RunningJob.  It holds
157       * a JobProfile object to provide some info, and interacts with the
158       * remote service to provide certain functionality.
159       */
160      static class NetworkedJob implements RunningJob {
161        Job job;
162        /**
163         * We store a JobProfile and a timestamp for when we last
164         * acquired the job profile.  If the job is null, then we cannot
165         * perform any of the tasks.  The job might be null if the cluster
166         * has completely forgotten about the job.  (eg, 24 hours after the
167         * job completes.)
168         */
169        public NetworkedJob(JobStatus status, Cluster cluster) throws IOException {
170          job = Job.getInstance(cluster, status, new JobConf(status.getJobFile()));
171        }
172    
173        public NetworkedJob(Job job) throws IOException {
174          this.job = job;
175        }
176    
177        public Configuration getConfiguration() {
178          return job.getConfiguration();
179        }
180    
181        /**
182         * An identifier for the job
183         */
184        public JobID getID() {
185          return JobID.downgrade(job.getJobID());
186        }
187        
188        /** @deprecated This method is deprecated and will be removed. Applications should 
189         * rather use {@link #getID()}.*/
190        @Deprecated
191        public String getJobID() {
192          return getID().toString();
193        }
194        
195        /**
196         * The user-specified job name
197         */
198        public String getJobName() {
199          return job.getJobName();
200        }
201    
202        /**
203         * The name of the job file
204         */
205        public String getJobFile() {
206          return job.getJobFile();
207        }
208    
209        /**
210         * A URL where the job's status can be seen
211         */
212        public String getTrackingURL() {
213          return job.getTrackingURL();
214        }
215    
216        /**
217         * A float between 0.0 and 1.0, indicating the % of map work
218         * completed.
219         */
220        public float mapProgress() throws IOException {
221          try {
222            return job.mapProgress();
223          } catch (InterruptedException ie) {
224            throw new IOException(ie);
225          }
226        }
227    
228        /**
229         * A float between 0.0 and 1.0, indicating the % of reduce work
230         * completed.
231         */
232        public float reduceProgress() throws IOException {
233          try {
234            return job.reduceProgress();
235          } catch (InterruptedException ie) {
236            throw new IOException(ie);
237          }
238        }
239    
240        /**
241         * A float between 0.0 and 1.0, indicating the % of cleanup work
242         * completed.
243         */
244        public float cleanupProgress() throws IOException {
245          try {
246            return job.cleanupProgress();
247          } catch (InterruptedException ie) {
248            throw new IOException(ie);
249          }
250        }
251    
252        /**
253         * A float between 0.0 and 1.0, indicating the % of setup work
254         * completed.
255         */
256        public float setupProgress() throws IOException {
257          try {
258            return job.setupProgress();
259          } catch (InterruptedException ie) {
260            throw new IOException(ie);
261          }
262        }
263    
264        /**
265         * Returns immediately whether the whole job is done yet or not.
266         */
267        public synchronized boolean isComplete() throws IOException {
268          try {
269            return job.isComplete();
270          } catch (InterruptedException ie) {
271            throw new IOException(ie);
272          }
273        }
274    
275        /**
276         * True iff job completed successfully.
277         */
278        public synchronized boolean isSuccessful() throws IOException {
279          try {
280            return job.isSuccessful();
281          } catch (InterruptedException ie) {
282            throw new IOException(ie);
283          }
284        }
285    
286        /**
287         * Blocks until the job is finished
288         */
289        public void waitForCompletion() throws IOException {
290          try {
291            job.waitForCompletion(false);
292          } catch (InterruptedException ie) {
293            throw new IOException(ie);
294          } catch (ClassNotFoundException ce) {
295            throw new IOException(ce);
296          }
297        }
298    
299        /**
300         * Tells the service to get the state of the current job.
301         */
302        public synchronized int getJobState() throws IOException {
303          try {
304            return job.getJobState().getValue();
305          } catch (InterruptedException ie) {
306            throw new IOException(ie);
307          }
308        }
309        
310        /**
311         * Tells the service to terminate the current job.
312         */
313        public synchronized void killJob() throws IOException {
314          try {
315            job.killJob();
316          } catch (InterruptedException ie) {
317            throw new IOException(ie);
318          }
319        }
320       
321        
322        /** Set the priority of the job.
323        * @param priority new priority of the job. 
324        */
325        public synchronized void setJobPriority(String priority) 
326                                                    throws IOException {
327          try {
328            job.setPriority(
329              org.apache.hadoop.mapreduce.JobPriority.valueOf(priority));
330          } catch (InterruptedException ie) {
331            throw new IOException(ie);
332          }
333        }
334        
335        /**
336         * Kill indicated task attempt.
337         * @param taskId the id of the task to kill.
338         * @param shouldFail if true the task is failed and added to failed tasks list, otherwise
339         * it is just killed, w/o affecting job failure status.
340         */
341        public synchronized void killTask(TaskAttemptID taskId,
342            boolean shouldFail) throws IOException {
343          try {
344            if (shouldFail) {
345              job.failTask(taskId);
346            } else {
347              job.killTask(taskId);
348            }
349          } catch (InterruptedException ie) {
350            throw new IOException(ie);
351          }
352        }
353    
354        /** @deprecated Applications should rather use {@link #killTask(TaskAttemptID, boolean)}*/
355        @Deprecated
356        public synchronized void killTask(String taskId, boolean shouldFail) throws IOException {
357          killTask(TaskAttemptID.forName(taskId), shouldFail);
358        }
359        
360        /**
361         * Fetch task completion events from cluster for this job. 
362         */
363        public synchronized TaskCompletionEvent[] getTaskCompletionEvents(
364            int startFrom) throws IOException {
365          try {
366            org.apache.hadoop.mapreduce.TaskCompletionEvent[] acls = 
367              job.getTaskCompletionEvents(startFrom, 10);
368            TaskCompletionEvent[] ret = new TaskCompletionEvent[acls.length];
369            for (int i = 0 ; i < acls.length; i++ ) {
370              ret[i] = TaskCompletionEvent.downgrade(acls[i]);
371            }
372            return ret;
373          } catch (InterruptedException ie) {
374            throw new IOException(ie);
375          }
376        }
377    
378        /**
379         * Dump stats to screen
380         */
381        @Override
382        public String toString() {
383          return job.toString();
384        }
385            
386        /**
387         * Returns the counters for this job
388         */
389        public Counters getCounters() throws IOException {
390          try { 
391            Counters result = null;
392            org.apache.hadoop.mapreduce.Counters temp = job.getCounters();
393            if(temp != null) {
394              result = Counters.downgrade(temp);
395            }
396            return result;
397          } catch (InterruptedException ie) {
398            throw new IOException(ie);
399          }
400        }
401        
402        @Override
403        public String[] getTaskDiagnostics(TaskAttemptID id) throws IOException {
404          try { 
405            return job.getTaskDiagnostics(id);
406          } catch (InterruptedException ie) {
407            throw new IOException(ie);
408          }
409        }
410    
411        public String getHistoryUrl() throws IOException {
412          try {
413            return job.getHistoryUrl();
414          } catch (InterruptedException ie) {
415            throw new IOException(ie);
416          }
417        }
418    
419        public boolean isRetired() throws IOException {
420          try {
421            return job.isRetired();
422          } catch (InterruptedException ie) {
423            throw new IOException(ie);
424          }
425        }
426        
427        boolean monitorAndPrintJob() throws IOException, InterruptedException {
428          return job.monitorAndPrintJob();
429        }
430        
431        @Override
432        public String getFailureInfo() throws IOException {
433          try {
434            return job.getStatus().getFailureInfo();
435          } catch (InterruptedException ie) {
436            throw new IOException(ie);
437          }
438        }
439    
440        @Override
441        public JobStatus getJobStatus() throws IOException {
442          try {
443            return JobStatus.downgrade(job.getStatus());
444          } catch (InterruptedException ie) {
445            throw new IOException(ie);
446          }
447        }
448      }
449    
450      /**
451       * Ugi of the client. We store this ugi when the client is created and 
452       * then make sure that the same ugi is used to run the various protocols.
453       */
454      UserGroupInformation clientUgi;
455      
456      /**
457       * Create a job client.
458       */
459      public JobClient() {
460      }
461        
462      /**
463       * Build a job client with the given {@link JobConf}, and connect to the 
464       * default cluster
465       * 
466       * @param conf the job configuration.
467       * @throws IOException
468       */
469      public JobClient(JobConf conf) throws IOException {
470        init(conf);
471      }
472    
473      /**
474       * Build a job client with the given {@link Configuration}, 
475       * and connect to the default cluster
476       * 
477       * @param conf the configuration.
478       * @throws IOException
479       */
480      public JobClient(Configuration conf) throws IOException {
481        init(new JobConf(conf));
482      }
483    
484      /**
485       * Connect to the default cluster
486       * @param conf the job configuration.
487       * @throws IOException
488       */
489      public void init(JobConf conf) throws IOException {
490        setConf(conf);
491        cluster = new Cluster(conf);
492        clientUgi = UserGroupInformation.getCurrentUser();
493      }
494    
495      /**
496       * Build a job client, connect to the indicated job tracker.
497       * 
498       * @param jobTrackAddr the job tracker to connect to.
499       * @param conf configuration.
500       */
501      public JobClient(InetSocketAddress jobTrackAddr, 
502                       Configuration conf) throws IOException {
503        cluster = new Cluster(jobTrackAddr, conf);
504        clientUgi = UserGroupInformation.getCurrentUser();
505      }
506    
507      /**
508       * Close the <code>JobClient</code>.
509       */
510      public synchronized void close() throws IOException {
511        cluster.close();
512      }
513    
514      /**
515       * Get a filesystem handle.  We need this to prepare jobs
516       * for submission to the MapReduce system.
517       * 
518       * @return the filesystem handle.
519       */
520      public synchronized FileSystem getFs() throws IOException {
521        try { 
522          return cluster.getFileSystem();
523        } catch (InterruptedException ie) {
524          throw new IOException(ie);
525        }
526      }
527      
528      /**
529       * Get a handle to the Cluster
530       */
531      public Cluster getClusterHandle() {
532        return cluster;
533      }
534      
535      /**
536       * Submit a job to the MR system.
537       * 
538       * This returns a handle to the {@link RunningJob} which can be used to track
539       * the running-job.
540       * 
541       * @param jobFile the job configuration.
542       * @return a handle to the {@link RunningJob} which can be used to track the
543       *         running-job.
544       * @throws FileNotFoundException
545       * @throws InvalidJobConfException
546       * @throws IOException
547       */
548      public RunningJob submitJob(String jobFile) throws FileNotFoundException, 
549                                                         InvalidJobConfException, 
550                                                         IOException {
551        // Load in the submitted job details
552        JobConf job = new JobConf(jobFile);
553        return submitJob(job);
554      }
555        
556      /**
557       * Submit a job to the MR system.
558       * This returns a handle to the {@link RunningJob} which can be used to track
559       * the running-job.
560       * 
561       * @param conf the job configuration.
562       * @return a handle to the {@link RunningJob} which can be used to track the
563       *         running-job.
564       * @throws FileNotFoundException
565       * @throws IOException
566       */
567      public RunningJob submitJob(final JobConf conf) throws FileNotFoundException,
568                                                      IOException {
569        try {
570          conf.setBooleanIfUnset("mapred.mapper.new-api", false);
571          conf.setBooleanIfUnset("mapred.reducer.new-api", false);
572          if (getDelegationTokenCalled) {
573            conf.setBoolean(HS_DELEGATION_TOKEN_REQUIRED, getDelegationTokenCalled);
574            getDelegationTokenCalled = false;
575          }
576          Job job = clientUgi.doAs(new PrivilegedExceptionAction<Job> () {
577            @Override
578            public Job run() throws IOException, ClassNotFoundException, 
579              InterruptedException {
580              Job job = Job.getInstance(conf);
581              job.submit();
582              return job;
583            }
584          });
585          // update our Cluster instance with the one created by Job for submission
586          // (we can't pass our Cluster instance to Job, since Job wraps the config
587          // instance, and the two configs would then diverge)
588          cluster = job.getCluster();
589          return new NetworkedJob(job);
590        } catch (InterruptedException ie) {
591          throw new IOException("interrupted", ie);
592        }
593      }
594    
595      private Job getJobUsingCluster(final JobID jobid) throws IOException,
596      InterruptedException {
597        return clientUgi.doAs(new PrivilegedExceptionAction<Job>() {
598          public Job run() throws IOException, InterruptedException  {
599           return cluster.getJob(jobid);
600          }
601        });
602      }
603      /**
604       * Get an {@link RunningJob} object to track an ongoing job.  Returns
605       * null if the id does not correspond to any known job.
606       * 
607       * @param jobid the jobid of the job.
608       * @return the {@link RunningJob} handle to track the job, null if the 
609       *         <code>jobid</code> doesn't correspond to any known job.
610       * @throws IOException
611       */
612      public RunningJob getJob(final JobID jobid) throws IOException {
613        try {
614          
615          Job job = getJobUsingCluster(jobid);
616          if (job != null) {
617            JobStatus status = JobStatus.downgrade(job.getStatus());
618            if (status != null) {
619              return new NetworkedJob(status, cluster);
620            } 
621          }
622        } catch (InterruptedException ie) {
623          throw new IOException(ie);
624        }
625        return null;
626      }
627    
628      /**@deprecated Applications should rather use {@link #getJob(JobID)}. 
629       */
630      @Deprecated
631      public RunningJob getJob(String jobid) throws IOException {
632        return getJob(JobID.forName(jobid));
633      }
634      
635      private static final TaskReport[] EMPTY_TASK_REPORTS = new TaskReport[0];
636      
637      /**
638       * Get the information of the current state of the map tasks of a job.
639       * 
640       * @param jobId the job to query.
641       * @return the list of all of the map tips.
642       * @throws IOException
643       */
644      public TaskReport[] getMapTaskReports(JobID jobId) throws IOException {
645        return getTaskReports(jobId, TaskType.MAP);
646      }
647      
648      private TaskReport[] getTaskReports(final JobID jobId, TaskType type) throws 
649        IOException {
650        try {
651          Job j = getJobUsingCluster(jobId);
652          if(j == null) {
653            return EMPTY_TASK_REPORTS;
654          }
655          return TaskReport.downgradeArray(j.getTaskReports(type));
656        } catch (InterruptedException ie) {
657          throw new IOException(ie);
658        }
659      }
660      
661      /**@deprecated Applications should rather use {@link #getMapTaskReports(JobID)}*/
662      @Deprecated
663      public TaskReport[] getMapTaskReports(String jobId) throws IOException {
664        return getMapTaskReports(JobID.forName(jobId));
665      }
666      
667      /**
668       * Get the information of the current state of the reduce tasks of a job.
669       * 
670       * @param jobId the job to query.
671       * @return the list of all of the reduce tips.
672       * @throws IOException
673       */    
674      public TaskReport[] getReduceTaskReports(JobID jobId) throws IOException {
675        return getTaskReports(jobId, TaskType.REDUCE);
676      }
677    
678      /**
679       * Get the information of the current state of the cleanup tasks of a job.
680       * 
681       * @param jobId the job to query.
682       * @return the list of all of the cleanup tips.
683       * @throws IOException
684       */    
685      public TaskReport[] getCleanupTaskReports(JobID jobId) throws IOException {
686        return getTaskReports(jobId, TaskType.JOB_CLEANUP);
687      }
688    
689      /**
690       * Get the information of the current state of the setup tasks of a job.
691       * 
692       * @param jobId the job to query.
693       * @return the list of all of the setup tips.
694       * @throws IOException
695       */    
696      public TaskReport[] getSetupTaskReports(JobID jobId) throws IOException {
697        return getTaskReports(jobId, TaskType.JOB_SETUP);
698      }
699    
700      
701      /**@deprecated Applications should rather use {@link #getReduceTaskReports(JobID)}*/
702      @Deprecated
703      public TaskReport[] getReduceTaskReports(String jobId) throws IOException {
704        return getReduceTaskReports(JobID.forName(jobId));
705      }
706      
707      /**
708       * Display the information about a job's tasks, of a particular type and
709       * in a particular state
710       * 
711       * @param jobId the ID of the job
712       * @param type the type of the task (map/reduce/setup/cleanup)
713       * @param state the state of the task 
714       * (pending/running/completed/failed/killed)
715       */
716      public void displayTasks(final JobID jobId, String type, String state) 
717      throws IOException {
718        try {
719          Job job = getJobUsingCluster(jobId);
720          super.displayTasks(job, type, state);
721        } catch (InterruptedException ie) {
722          throw new IOException(ie);
723        }
724      }
725      
726      /**
727       * Get status information about the Map-Reduce cluster.
728       *  
729       * @return the status information about the Map-Reduce cluster as an object
730       *         of {@link ClusterStatus}.
731       * @throws IOException
732       */
733      public ClusterStatus getClusterStatus() throws IOException {
734        try {
735          return clientUgi.doAs(new PrivilegedExceptionAction<ClusterStatus>() {
736            public ClusterStatus run()  throws IOException, InterruptedException {
737              ClusterMetrics metrics = cluster.getClusterStatus();
738              return new ClusterStatus(metrics.getTaskTrackerCount(),
739                  metrics.getBlackListedTaskTrackerCount(), cluster.getTaskTrackerExpiryInterval(),
740                  metrics.getOccupiedMapSlots(),
741                  metrics.getOccupiedReduceSlots(), metrics.getMapSlotCapacity(),
742                  metrics.getReduceSlotCapacity(),
743                  cluster.getJobTrackerStatus(),
744                  metrics.getDecommissionedTaskTrackerCount());
745            }
746          });
747        }
748          catch (InterruptedException ie) {
749          throw new IOException(ie);
750        }
751      }
752    
753      private  Collection<String> arrayToStringList(TaskTrackerInfo[] objs) {
754        Collection<String> list = new ArrayList<String>();
755        for (TaskTrackerInfo info: objs) {
756          list.add(info.getTaskTrackerName());
757        }
758        return list;
759      }
760    
761      private  Collection<BlackListInfo> arrayToBlackListInfo(TaskTrackerInfo[] objs) {
762        Collection<BlackListInfo> list = new ArrayList<BlackListInfo>();
763        for (TaskTrackerInfo info: objs) {
764          BlackListInfo binfo = new BlackListInfo();
765          binfo.setTrackerName(info.getTaskTrackerName());
766          binfo.setReasonForBlackListing(info.getReasonForBlacklist());
767          binfo.setBlackListReport(info.getBlacklistReport());
768          list.add(binfo);
769        }
770        return list;
771      }
772    
773      /**
774       * Get status information about the Map-Reduce cluster.
775       *  
776       * @param  detailed if true then get a detailed status including the
777       *         tracker names
778       * @return the status information about the Map-Reduce cluster as an object
779       *         of {@link ClusterStatus}.
780       * @throws IOException
781       */
782      public ClusterStatus getClusterStatus(boolean detailed) throws IOException {
783        try {
784          return clientUgi.doAs(new PrivilegedExceptionAction<ClusterStatus>() {
785            public ClusterStatus run() throws IOException, InterruptedException {
786            ClusterMetrics metrics = cluster.getClusterStatus();
787            return new ClusterStatus(arrayToStringList(cluster.getActiveTaskTrackers()),
788              arrayToBlackListInfo(cluster.getBlackListedTaskTrackers()),
789              cluster.getTaskTrackerExpiryInterval(), metrics.getOccupiedMapSlots(),
790              metrics.getOccupiedReduceSlots(), metrics.getMapSlotCapacity(),
791              metrics.getReduceSlotCapacity(), 
792              cluster.getJobTrackerStatus());
793            }
794          });
795        } catch (InterruptedException ie) {
796          throw new IOException(ie);
797        }
798      }
799        
800    
801      /** 
802       * Get the jobs that are not completed and not failed.
803       * 
804       * @return array of {@link JobStatus} for the running/to-be-run jobs.
805       * @throws IOException
806       */
807      public JobStatus[] jobsToComplete() throws IOException {
808        List<JobStatus> stats = new ArrayList<JobStatus>();
809        for (JobStatus stat : getAllJobs()) {
810          if (!stat.isJobComplete()) {
811            stats.add(stat);
812          }
813        }
814        return stats.toArray(new JobStatus[0]);
815      }
816    
817      /** 
818       * Get the jobs that are submitted.
819       * 
820       * @return array of {@link JobStatus} for the submitted jobs.
821       * @throws IOException
822       */
823      public JobStatus[] getAllJobs() throws IOException {
824        try {
825          org.apache.hadoop.mapreduce.JobStatus[] jobs = 
826              clientUgi.doAs(new PrivilegedExceptionAction<
827                  org.apache.hadoop.mapreduce.JobStatus[]> () {
828                public org.apache.hadoop.mapreduce.JobStatus[] run() 
829                    throws IOException, InterruptedException {
830                  return cluster.getAllJobStatuses();
831                }
832              });
833          JobStatus[] stats = new JobStatus[jobs.length];
834          for (int i = 0; i < jobs.length; i++) {
835            stats[i] = JobStatus.downgrade(jobs[i]);
836          }
837          return stats;
838        } catch (InterruptedException ie) {
839          throw new IOException(ie);
840        }
841      }
842      
843      /** 
844       * Utility that submits a job, then polls for progress until the job is
845       * complete.
846       * 
847       * @param job the job configuration.
848       * @throws IOException if the job fails
849       */
850      public static RunningJob runJob(JobConf job) throws IOException {
851        JobClient jc = new JobClient(job);
852        RunningJob rj = jc.submitJob(job);
853        try {
854          if (!jc.monitorAndPrintJob(job, rj)) {
855            throw new IOException("Job failed!");
856          }
857        } catch (InterruptedException ie) {
858          Thread.currentThread().interrupt();
859        }
860        return rj;
861      }
862      
863      /**
864       * Monitor a job and print status in real-time as progress is made and tasks 
865       * fail.
866       * @param conf the job's configuration
867       * @param job the job to track
868       * @return true if the job succeeded
869       * @throws IOException if communication to the JobTracker fails
870       */
871      public boolean monitorAndPrintJob(JobConf conf, 
872                                        RunningJob job
873      ) throws IOException, InterruptedException {
874        return ((NetworkedJob)job).monitorAndPrintJob();
875      }
876    
877      static String getTaskLogURL(TaskAttemptID taskId, String baseUrl) {
878        return (baseUrl + "/tasklog?plaintext=true&attemptid=" + taskId); 
879      }
880      
881      static Configuration getConfiguration(String jobTrackerSpec)
882      {
883        Configuration conf = new Configuration();
884        if (jobTrackerSpec != null) {        
885          if (jobTrackerSpec.indexOf(":") >= 0) {
886            conf.set("mapred.job.tracker", jobTrackerSpec);
887          } else {
888            String classpathFile = "hadoop-" + jobTrackerSpec + ".xml";
889            URL validate = conf.getResource(classpathFile);
890            if (validate == null) {
891              throw new RuntimeException(classpathFile + " not found on CLASSPATH");
892            }
893            conf.addResource(classpathFile);
894          }
895        }
896        return conf;
897      }
898    
899      /**
900       * Sets the output filter for tasks. only those tasks are printed whose
901       * output matches the filter. 
902       * @param newValue task filter.
903       */
904      @Deprecated
905      public void setTaskOutputFilter(TaskStatusFilter newValue){
906        this.taskOutputFilter = newValue;
907      }
908        
909      /**
910       * Get the task output filter out of the JobConf.
911       * 
912       * @param job the JobConf to examine.
913       * @return the filter level.
914       */
915      public static TaskStatusFilter getTaskOutputFilter(JobConf job) {
916        return TaskStatusFilter.valueOf(job.get("jobclient.output.filter", 
917                                                "FAILED"));
918      }
919        
920      /**
921       * Modify the JobConf to set the task output filter.
922       * 
923       * @param job the JobConf to modify.
924       * @param newValue the value to set.
925       */
926      public static void setTaskOutputFilter(JobConf job, 
927                                             TaskStatusFilter newValue) {
928        job.set("jobclient.output.filter", newValue.toString());
929      }
930        
931      /**
932       * Returns task output filter.
933       * @return task filter. 
934       */
935      @Deprecated
936      public TaskStatusFilter getTaskOutputFilter(){
937        return this.taskOutputFilter; 
938      }
939    
940      protected long getCounter(org.apache.hadoop.mapreduce.Counters cntrs,
941          String counterGroupName, String counterName) throws IOException {
942        Counters counters = Counters.downgrade(cntrs);
943        return counters.findCounter(counterGroupName, counterName).getValue();
944      }
945    
946      /**
947       * Get status information about the max available Maps in the cluster.
948       *  
949       * @return the max available Maps in the cluster
950       * @throws IOException
951       */
952      public int getDefaultMaps() throws IOException {
953        try {
954          return clientUgi.doAs(new PrivilegedExceptionAction<Integer>() {
955            @Override
956            public Integer run() throws IOException, InterruptedException {
957              return cluster.getClusterStatus().getMapSlotCapacity();
958            }
959          });
960        } catch (InterruptedException ie) {
961          throw new IOException(ie);
962        }
963      }
964    
965      /**
966       * Get status information about the max available Reduces in the cluster.
967       *  
968       * @return the max available Reduces in the cluster
969       * @throws IOException
970       */
971      public int getDefaultReduces() throws IOException {
972        try {
973          return clientUgi.doAs(new PrivilegedExceptionAction<Integer>() {
974            @Override
975            public Integer run() throws IOException, InterruptedException {
976              return cluster.getClusterStatus().getReduceSlotCapacity();
977            }
978          });
979        } catch (InterruptedException ie) {
980          throw new IOException(ie);
981        }
982      }
983    
984      /**
985       * Grab the jobtracker system directory path where job-specific files are to be placed.
986       * 
987       * @return the system directory where job-specific files are to be placed.
988       */
989      public Path getSystemDir() {
990        try {
991          return clientUgi.doAs(new PrivilegedExceptionAction<Path>() {
992            @Override
993            public Path run() throws IOException, InterruptedException {
994              return cluster.getSystemDir();
995            }
996          });
997          } catch (IOException ioe) {
998          return null;
999        } catch (InterruptedException ie) {
1000          return null;
1001        }
1002      }
1003    
1004      private JobQueueInfo getJobQueueInfo(QueueInfo queue) {
1005        JobQueueInfo ret = new JobQueueInfo(queue);
1006        // make sure to convert any children
1007        if (queue.getQueueChildren().size() > 0) {
1008          List<JobQueueInfo> childQueues = new ArrayList<JobQueueInfo>(queue
1009              .getQueueChildren().size());
1010          for (QueueInfo child : queue.getQueueChildren()) {
1011            childQueues.add(getJobQueueInfo(child));
1012          }
1013          ret.setChildren(childQueues);
1014        }
1015        return ret;
1016      }
1017    
1018      private JobQueueInfo[] getJobQueueInfoArray(QueueInfo[] queues)
1019          throws IOException {
1020        JobQueueInfo[] ret = new JobQueueInfo[queues.length];
1021        for (int i = 0; i < queues.length; i++) {
1022          ret[i] = getJobQueueInfo(queues[i]);
1023        }
1024        return ret;
1025      }
1026    
1027      /**
1028       * Returns an array of queue information objects about root level queues
1029       * configured
1030       *
1031       * @return the array of root level JobQueueInfo objects
1032       * @throws IOException
1033       */
1034      public JobQueueInfo[] getRootQueues() throws IOException {
1035        try {
1036          return clientUgi.doAs(new PrivilegedExceptionAction<JobQueueInfo[]>() {
1037            public JobQueueInfo[] run() throws IOException, InterruptedException {
1038              return getJobQueueInfoArray(cluster.getRootQueues());
1039            }
1040          });
1041        } catch (InterruptedException ie) {
1042          throw new IOException(ie);
1043        }
1044      }
1045    
1046      /**
1047       * Returns an array of queue information objects about immediate children
1048       * of queue queueName.
1049       * 
1050       * @param queueName
1051       * @return the array of immediate children JobQueueInfo objects
1052       * @throws IOException
1053       */
1054      public JobQueueInfo[] getChildQueues(final String queueName) throws IOException {
1055        try {
1056          return clientUgi.doAs(new PrivilegedExceptionAction<JobQueueInfo[]>() {
1057            public JobQueueInfo[] run() throws IOException, InterruptedException {
1058              return getJobQueueInfoArray(cluster.getChildQueues(queueName));
1059            }
1060          });
1061        } catch (InterruptedException ie) {
1062          throw new IOException(ie);
1063        }
1064      }
1065      
1066      /**
1067       * Return an array of queue information objects about all the Job Queues
1068       * configured.
1069       * 
1070       * @return Array of JobQueueInfo objects
1071       * @throws IOException
1072       */
1073      public JobQueueInfo[] getQueues() throws IOException {
1074        try {
1075          return clientUgi.doAs(new PrivilegedExceptionAction<JobQueueInfo[]>() {
1076            public JobQueueInfo[] run() throws IOException, InterruptedException {
1077              return getJobQueueInfoArray(cluster.getQueues());
1078            }
1079          });
1080        } catch (InterruptedException ie) {
1081          throw new IOException(ie);
1082        }
1083      }
1084      
1085      /**
1086       * Gets all the jobs which were added to particular Job Queue
1087       * 
1088       * @param queueName name of the Job Queue
1089       * @return Array of jobs present in the job queue
1090       * @throws IOException
1091       */
1092      
1093      public JobStatus[] getJobsFromQueue(final String queueName) throws IOException {
1094        try {
1095          QueueInfo queue = clientUgi.doAs(new PrivilegedExceptionAction<QueueInfo>() {
1096            @Override
1097            public QueueInfo run() throws IOException, InterruptedException {
1098              return cluster.getQueue(queueName);
1099            }
1100          });
1101          if (queue == null) {
1102            return null;
1103          }
1104          org.apache.hadoop.mapreduce.JobStatus[] stats = 
1105            queue.getJobStatuses();
1106          JobStatus[] ret = new JobStatus[stats.length];
1107          for (int i = 0 ; i < stats.length; i++ ) {
1108            ret[i] = JobStatus.downgrade(stats[i]);
1109          }
1110          return ret;
1111        } catch (InterruptedException ie) {
1112          throw new IOException(ie);
1113        }
1114      }
1115      
1116      /**
1117       * Gets the queue information associated to a particular Job Queue
1118       * 
1119       * @param queueName name of the job queue.
1120       * @return Queue information associated to particular queue.
1121       * @throws IOException
1122       */
1123      public JobQueueInfo getQueueInfo(final String queueName) throws IOException {
1124        try {
1125          QueueInfo queueInfo = clientUgi.doAs(new 
1126              PrivilegedExceptionAction<QueueInfo>() {
1127            public QueueInfo run() throws IOException, InterruptedException {
1128              return cluster.getQueue(queueName);
1129            }
1130          });
1131          if (queueInfo != null) {
1132            return new JobQueueInfo(queueInfo);
1133          }
1134          return null;
1135        } catch (InterruptedException ie) {
1136          throw new IOException(ie);
1137        }
1138      }
1139      
1140      /**
1141       * Gets the Queue ACLs for current user
1142       * @return array of QueueAclsInfo object for current user.
1143       * @throws IOException
1144       */
1145      public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException {
1146        try {
1147          org.apache.hadoop.mapreduce.QueueAclsInfo[] acls = 
1148            clientUgi.doAs(new 
1149                PrivilegedExceptionAction
1150                <org.apache.hadoop.mapreduce.QueueAclsInfo[]>() {
1151                  public org.apache.hadoop.mapreduce.QueueAclsInfo[] run() 
1152                  throws IOException, InterruptedException {
1153                    return cluster.getQueueAclsForCurrentUser();
1154                  }
1155            });
1156          QueueAclsInfo[] ret = new QueueAclsInfo[acls.length];
1157          for (int i = 0 ; i < acls.length; i++ ) {
1158            ret[i] = QueueAclsInfo.downgrade(acls[i]);
1159          }
1160          return ret;
1161        } catch (InterruptedException ie) {
1162          throw new IOException(ie);
1163        }
1164      }
1165    
1166      /**
1167       * Get a delegation token for the user from the JobTracker.
1168       * @param renewer the user who can renew the token
1169       * @return the new token
1170       * @throws IOException
1171       */
1172      public Token<DelegationTokenIdentifier> 
1173        getDelegationToken(final Text renewer) throws IOException, InterruptedException {
1174        getDelegationTokenCalled = true;
1175        return clientUgi.doAs(new 
1176            PrivilegedExceptionAction<Token<DelegationTokenIdentifier>>() {
1177          public Token<DelegationTokenIdentifier> run() throws IOException, 
1178          InterruptedException {
1179            return cluster.getDelegationToken(renewer);
1180          }
1181        });
1182      }
1183    
1184      /**
1185       * Renew a delegation token
1186       * @param token the token to renew
1187       * @return true if the renewal went well
1188       * @throws InvalidToken
1189       * @throws IOException
1190       * @deprecated Use {@link Token#renew} instead
1191       */
1192      public long renewDelegationToken(Token<DelegationTokenIdentifier> token
1193                                       ) throws InvalidToken, IOException, 
1194                                                InterruptedException {
1195        return token.renew(getConf());
1196      }
1197    
1198      /**
1199       * Cancel a delegation token from the JobTracker
1200       * @param token the token to cancel
1201       * @throws IOException
1202       * @deprecated Use {@link Token#cancel} instead
1203       */
1204      public void cancelDelegationToken(Token<DelegationTokenIdentifier> token
1205                                        ) throws InvalidToken, IOException, 
1206                                                 InterruptedException {
1207        token.cancel(getConf());
1208      }
1209    
1210      /**
1211       */
1212      public static void main(String argv[]) throws Exception {
1213        int res = ToolRunner.run(new JobClient(), argv);
1214        System.exit(res);
1215      }
1216    }
1217