001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapreduce.lib.jobcontrol;
020    
021    import java.io.IOException;
022    import java.util.ArrayList;
023    import java.util.Collection;
024    import java.util.Iterator;
025    import java.util.LinkedList;
026    import java.util.List;
027    
028    import org.apache.commons.logging.Log;
029    import org.apache.commons.logging.LogFactory;
030    import org.apache.hadoop.classification.InterfaceAudience;
031    import org.apache.hadoop.classification.InterfaceStability;
032    import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob.State;
033    import org.apache.hadoop.util.StringUtils;
034    
035    /** 
036     *  This class encapsulates a set of MapReduce jobs and its dependency.
037     *   
038     *  It tracks the states of the jobs by placing them into different tables
039     *  according to their states. 
040     *  
041     *  This class provides APIs for the client app to add a job to the group 
042     *  and to get the jobs in the group in different states. When a job is 
043     *  added, an ID unique to the group is assigned to the job. 
044     *  
045     *  This class has a thread that submits jobs when they become ready, 
046     *  monitors the states of the running jobs, and updates the states of jobs
047     *  based on the state changes of their depending jobs states. The class 
048     *  provides APIs for suspending/resuming the thread, and 
049     *  for stopping the thread.
050     *  
051     */
052    @InterfaceAudience.Public
053    @InterfaceStability.Evolving
054    public class JobControl implements Runnable {
055      private static final Log LOG = LogFactory.getLog(JobControl.class);
056    
057      // The thread can be in one of the following state
058      public static enum ThreadState {RUNNING, SUSPENDED,STOPPED, STOPPING, READY};
059            
060      private ThreadState runnerState;                      // the thread state
061            
062      private LinkedList<ControlledJob> jobsInProgress = new LinkedList<ControlledJob>();
063      private LinkedList<ControlledJob> successfulJobs = new LinkedList<ControlledJob>();
064      private LinkedList<ControlledJob> failedJobs = new LinkedList<ControlledJob>();
065            
066      private long nextJobID;
067      private String groupName;
068            
069      /** 
070       * Construct a job control for a group of jobs.
071       * @param groupName a name identifying this group
072       */
073      public JobControl(String groupName) {
074        this.nextJobID = -1;
075        this.groupName = groupName;
076        this.runnerState = ThreadState.READY;
077      }
078            
079      private static List<ControlledJob> toList(
080                       LinkedList<ControlledJob> jobs) {
081        ArrayList<ControlledJob> retv = new ArrayList<ControlledJob>();
082        synchronized (jobs) {
083          for (ControlledJob job : jobs) {
084            retv.add(job);
085          }
086        }
087        return retv;
088      }
089            
090      synchronized private List<ControlledJob> getJobsIn(State state) {
091        LinkedList<ControlledJob> l = new LinkedList<ControlledJob>();
092        for(ControlledJob j: jobsInProgress) {
093          if(j.getJobState() == state) {
094            l.add(j);
095          }
096        }
097        return l;
098      }
099      
100      /**
101       * @return the jobs in the waiting state
102       */
103      public List<ControlledJob> getWaitingJobList() {
104        return getJobsIn(State.WAITING);
105      }
106            
107      /**
108       * @return the jobs in the running state
109       */
110      public List<ControlledJob> getRunningJobList() {
111        return getJobsIn(State.RUNNING);
112      }
113            
114      /**
115       * @return the jobs in the ready state
116       */
117      public List<ControlledJob> getReadyJobsList() {
118        return getJobsIn(State.READY);
119      }
120            
121      /**
122       * @return the jobs in the success state
123       */
124      public List<ControlledJob> getSuccessfulJobList() {
125        return toList(this.successfulJobs);
126      }
127            
128      public List<ControlledJob> getFailedJobList() {
129        return toList(this.failedJobs);
130      }
131            
132      private String getNextJobID() {
133        nextJobID += 1;
134        return this.groupName + this.nextJobID;
135      }
136    
137      /**
138       * Add a new job.
139       * @param aJob the new job
140       */
141      synchronized public String addJob(ControlledJob aJob) {
142        String id = this.getNextJobID();
143        aJob.setJobID(id);
144        aJob.setJobState(State.WAITING);
145        jobsInProgress.add(aJob);
146        return id;  
147      }
148            
149      /**
150       * Add a collection of jobs
151       * 
152       * @param jobs
153       */
154      public void addJobCollection(Collection<ControlledJob> jobs) {
155        for (ControlledJob job : jobs) {
156          addJob(job);
157        }
158      }
159            
160      /**
161       * @return the thread state
162       */
163      public ThreadState getThreadState() {
164        return this.runnerState;
165      }
166            
167      /**
168       * set the thread state to STOPPING so that the 
169       * thread will stop when it wakes up.
170       */
171      public void stop() {
172        this.runnerState = ThreadState.STOPPING;
173      }
174            
175      /**
176       * suspend the running thread
177       */
178      public void suspend () {
179        if (this.runnerState == ThreadState.RUNNING) {
180          this.runnerState = ThreadState.SUSPENDED;
181        }
182      }
183            
184      /**
185       * resume the suspended thread
186       */
187      public void resume () {
188        if (this.runnerState == ThreadState.SUSPENDED) {
189          this.runnerState = ThreadState.RUNNING;
190        }
191      }
192            
193      synchronized public boolean allFinished() {
194        return jobsInProgress.isEmpty();
195      }
196            
197      /**
198       *  The main loop for the thread.
199       *  The loop does the following:
200       *    Check the states of the running jobs
201       *    Update the states of waiting jobs
202       *    Submit the jobs in ready state
203       */
204      public void run() {
205        try {
206          this.runnerState = ThreadState.RUNNING;
207          while (true) {
208            while (this.runnerState == ThreadState.SUSPENDED) {
209              try {
210                Thread.sleep(5000);
211              }
212              catch (Exception e) {
213                //TODO the thread was interrupted, do something!!!
214              }
215            }
216            
217            synchronized(this) {
218              Iterator<ControlledJob> it = jobsInProgress.iterator();
219              while(it.hasNext()) {
220                ControlledJob j = it.next();
221                LOG.debug("Checking state of job "+j);
222                switch(j.checkState()) {
223                case SUCCESS:
224                  successfulJobs.add(j);
225                  it.remove();
226                  break;
227                case FAILED:
228                case DEPENDENT_FAILED:
229                  failedJobs.add(j);
230                  it.remove();
231                  break;
232                case READY:
233                  j.submit();
234                  break;
235                case RUNNING:
236                case WAITING:
237                  //Do Nothing
238                  break;
239                }
240              }
241            }
242            
243            if (this.runnerState != ThreadState.RUNNING && 
244                this.runnerState != ThreadState.SUSPENDED) {
245              break;
246            }
247            try {
248              Thread.sleep(5000);
249            }
250            catch (Exception e) {
251              //TODO the thread was interrupted, do something!!!
252            }
253            if (this.runnerState != ThreadState.RUNNING && 
254                this.runnerState != ThreadState.SUSPENDED) {
255              break;
256            }
257          }
258        }catch(Throwable t) {
259          LOG.error("Error while trying to run jobs.",t);
260          //Mark all jobs as failed because we got something bad.
261          failAllJobs(t);
262        }
263        this.runnerState = ThreadState.STOPPED;
264      }
265    
266      synchronized private void failAllJobs(Throwable t) {
267        String message = "Unexpected System Error Occured: "+
268        StringUtils.stringifyException(t);
269        Iterator<ControlledJob> it = jobsInProgress.iterator();
270        while(it.hasNext()) {
271          ControlledJob j = it.next();
272          try {
273            j.failJob(message);
274          } catch (IOException e) {
275            LOG.error("Error while tyring to clean up "+j.getJobName(), e);
276          } catch (InterruptedException e) {
277            LOG.error("Error while tyring to clean up "+j.getJobName(), e);
278          } finally {
279            failedJobs.add(j);
280            it.remove();
281          }
282        }
283      }
284    }