001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.mapreduce.lib.jobcontrol; 020 021 import java.io.IOException; 022 import java.util.ArrayList; 023 import java.util.Collection; 024 import java.util.Iterator; 025 import java.util.LinkedList; 026 import java.util.List; 027 028 import org.apache.commons.logging.Log; 029 import org.apache.commons.logging.LogFactory; 030 import org.apache.hadoop.classification.InterfaceAudience; 031 import org.apache.hadoop.classification.InterfaceStability; 032 import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob.State; 033 import org.apache.hadoop.util.StringUtils; 034 035 /** 036 * This class encapsulates a set of MapReduce jobs and its dependency. 037 * 038 * It tracks the states of the jobs by placing them into different tables 039 * according to their states. 040 * 041 * This class provides APIs for the client app to add a job to the group 042 * and to get the jobs in the group in different states. When a job is 043 * added, an ID unique to the group is assigned to the job. 044 * 045 * This class has a thread that submits jobs when they become ready, 046 * monitors the states of the running jobs, and updates the states of jobs 047 * based on the state changes of their depending jobs states. The class 048 * provides APIs for suspending/resuming the thread, and 049 * for stopping the thread. 050 * 051 */ 052 @InterfaceAudience.Public 053 @InterfaceStability.Evolving 054 public class JobControl implements Runnable { 055 private static final Log LOG = LogFactory.getLog(JobControl.class); 056 057 // The thread can be in one of the following state 058 public static enum ThreadState {RUNNING, SUSPENDED,STOPPED, STOPPING, READY}; 059 060 private ThreadState runnerState; // the thread state 061 062 private LinkedList<ControlledJob> jobsInProgress = new LinkedList<ControlledJob>(); 063 private LinkedList<ControlledJob> successfulJobs = new LinkedList<ControlledJob>(); 064 private LinkedList<ControlledJob> failedJobs = new LinkedList<ControlledJob>(); 065 066 private long nextJobID; 067 private String groupName; 068 069 /** 070 * Construct a job control for a group of jobs. 071 * @param groupName a name identifying this group 072 */ 073 public JobControl(String groupName) { 074 this.nextJobID = -1; 075 this.groupName = groupName; 076 this.runnerState = ThreadState.READY; 077 } 078 079 private static List<ControlledJob> toList( 080 LinkedList<ControlledJob> jobs) { 081 ArrayList<ControlledJob> retv = new ArrayList<ControlledJob>(); 082 synchronized (jobs) { 083 for (ControlledJob job : jobs) { 084 retv.add(job); 085 } 086 } 087 return retv; 088 } 089 090 synchronized private List<ControlledJob> getJobsIn(State state) { 091 LinkedList<ControlledJob> l = new LinkedList<ControlledJob>(); 092 for(ControlledJob j: jobsInProgress) { 093 if(j.getJobState() == state) { 094 l.add(j); 095 } 096 } 097 return l; 098 } 099 100 /** 101 * @return the jobs in the waiting state 102 */ 103 public List<ControlledJob> getWaitingJobList() { 104 return getJobsIn(State.WAITING); 105 } 106 107 /** 108 * @return the jobs in the running state 109 */ 110 public List<ControlledJob> getRunningJobList() { 111 return getJobsIn(State.RUNNING); 112 } 113 114 /** 115 * @return the jobs in the ready state 116 */ 117 public List<ControlledJob> getReadyJobsList() { 118 return getJobsIn(State.READY); 119 } 120 121 /** 122 * @return the jobs in the success state 123 */ 124 public List<ControlledJob> getSuccessfulJobList() { 125 return toList(this.successfulJobs); 126 } 127 128 public List<ControlledJob> getFailedJobList() { 129 return toList(this.failedJobs); 130 } 131 132 private String getNextJobID() { 133 nextJobID += 1; 134 return this.groupName + this.nextJobID; 135 } 136 137 /** 138 * Add a new job. 139 * @param aJob the new job 140 */ 141 synchronized public String addJob(ControlledJob aJob) { 142 String id = this.getNextJobID(); 143 aJob.setJobID(id); 144 aJob.setJobState(State.WAITING); 145 jobsInProgress.add(aJob); 146 return id; 147 } 148 149 /** 150 * Add a collection of jobs 151 * 152 * @param jobs 153 */ 154 public void addJobCollection(Collection<ControlledJob> jobs) { 155 for (ControlledJob job : jobs) { 156 addJob(job); 157 } 158 } 159 160 /** 161 * @return the thread state 162 */ 163 public ThreadState getThreadState() { 164 return this.runnerState; 165 } 166 167 /** 168 * set the thread state to STOPPING so that the 169 * thread will stop when it wakes up. 170 */ 171 public void stop() { 172 this.runnerState = ThreadState.STOPPING; 173 } 174 175 /** 176 * suspend the running thread 177 */ 178 public void suspend () { 179 if (this.runnerState == ThreadState.RUNNING) { 180 this.runnerState = ThreadState.SUSPENDED; 181 } 182 } 183 184 /** 185 * resume the suspended thread 186 */ 187 public void resume () { 188 if (this.runnerState == ThreadState.SUSPENDED) { 189 this.runnerState = ThreadState.RUNNING; 190 } 191 } 192 193 synchronized public boolean allFinished() { 194 return jobsInProgress.isEmpty(); 195 } 196 197 /** 198 * The main loop for the thread. 199 * The loop does the following: 200 * Check the states of the running jobs 201 * Update the states of waiting jobs 202 * Submit the jobs in ready state 203 */ 204 public void run() { 205 try { 206 this.runnerState = ThreadState.RUNNING; 207 while (true) { 208 while (this.runnerState == ThreadState.SUSPENDED) { 209 try { 210 Thread.sleep(5000); 211 } 212 catch (Exception e) { 213 //TODO the thread was interrupted, do something!!! 214 } 215 } 216 217 synchronized(this) { 218 Iterator<ControlledJob> it = jobsInProgress.iterator(); 219 while(it.hasNext()) { 220 ControlledJob j = it.next(); 221 LOG.debug("Checking state of job "+j); 222 switch(j.checkState()) { 223 case SUCCESS: 224 successfulJobs.add(j); 225 it.remove(); 226 break; 227 case FAILED: 228 case DEPENDENT_FAILED: 229 failedJobs.add(j); 230 it.remove(); 231 break; 232 case READY: 233 j.submit(); 234 break; 235 case RUNNING: 236 case WAITING: 237 //Do Nothing 238 break; 239 } 240 } 241 } 242 243 if (this.runnerState != ThreadState.RUNNING && 244 this.runnerState != ThreadState.SUSPENDED) { 245 break; 246 } 247 try { 248 Thread.sleep(5000); 249 } 250 catch (Exception e) { 251 //TODO the thread was interrupted, do something!!! 252 } 253 if (this.runnerState != ThreadState.RUNNING && 254 this.runnerState != ThreadState.SUSPENDED) { 255 break; 256 } 257 } 258 }catch(Throwable t) { 259 LOG.error("Error while trying to run jobs.",t); 260 //Mark all jobs as failed because we got something bad. 261 failAllJobs(t); 262 } 263 this.runnerState = ThreadState.STOPPED; 264 } 265 266 synchronized private void failAllJobs(Throwable t) { 267 String message = "Unexpected System Error Occured: "+ 268 StringUtils.stringifyException(t); 269 Iterator<ControlledJob> it = jobsInProgress.iterator(); 270 while(it.hasNext()) { 271 ControlledJob j = it.next(); 272 try { 273 j.failJob(message); 274 } catch (IOException e) { 275 LOG.error("Error while tyring to clean up "+j.getJobName(), e); 276 } catch (InterruptedException e) { 277 LOG.error("Error while tyring to clean up "+j.getJobName(), e); 278 } finally { 279 failedJobs.add(j); 280 it.remove(); 281 } 282 } 283 } 284 }