001    /**
002    * Licensed to the Apache Software Foundation (ASF) under one
003    * or more contributor license agreements.  See the NOTICE file
004    * distributed with this work for additional information
005    * regarding copyright ownership.  The ASF licenses this file
006    * to you under the Apache License, Version 2.0 (the
007    * "License"); you may not use this file except in compliance
008    * with the License.  You may obtain a copy of the License at
009    *
010    *     http://www.apache.org/licenses/LICENSE-2.0
011    *
012    * Unless required by applicable law or agreed to in writing, software
013    * distributed under the License is distributed on an "AS IS" BASIS,
014    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015    * See the License for the specific language governing permissions and
016    * limitations under the License.
017    */
018    
019    package org.apache.hadoop.yarn.client;
020    
021    import java.net.InetSocketAddress;
022    import java.util.ArrayList;
023    import java.util.List;
024    
025    import org.apache.commons.logging.Log;
026    import org.apache.commons.logging.LogFactory;
027    import org.apache.hadoop.classification.InterfaceAudience;
028    import org.apache.hadoop.classification.InterfaceStability;
029    import org.apache.hadoop.conf.Configuration;
030    import org.apache.hadoop.io.Text;
031    import org.apache.hadoop.ipc.RPC;
032    import org.apache.hadoop.yarn.api.ClientRMProtocol;
033    import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest;
034    import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
035    import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
036    import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
037    import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
038    import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
039    import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
040    import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
041    import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
042    import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
043    import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
044    import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
045    import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
046    import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
047    import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
048    import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
049    import org.apache.hadoop.yarn.api.records.ApplicationId;
050    import org.apache.hadoop.yarn.api.records.ApplicationReport;
051    import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
052    import org.apache.hadoop.yarn.api.records.DelegationToken;
053    import org.apache.hadoop.yarn.api.records.NodeReport;
054    import org.apache.hadoop.yarn.api.records.QueueInfo;
055    import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
056    import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
057    import org.apache.hadoop.yarn.conf.YarnConfiguration;
058    import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
059    import org.apache.hadoop.yarn.ipc.YarnRPC;
060    import org.apache.hadoop.yarn.service.AbstractService;
061    import org.apache.hadoop.yarn.util.Records;
062    
063    @InterfaceAudience.Public
064    @InterfaceStability.Evolving
065    public class YarnClientImpl extends AbstractService implements YarnClient {
066    
067      private static final Log LOG = LogFactory.getLog(YarnClientImpl.class);
068    
069      protected ClientRMProtocol rmClient;
070      protected InetSocketAddress rmAddress;
071    
072      private static final String ROOT = "root";
073    
074      public YarnClientImpl() {
075        this(null);
076      }
077      
078      public YarnClientImpl(InetSocketAddress rmAddress) {
079        super(YarnClientImpl.class.getName());
080        this.rmAddress = rmAddress;
081      }
082    
083      private static InetSocketAddress getRmAddress(Configuration conf) {
084        return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
085          YarnConfiguration.DEFAULT_RM_ADDRESS, YarnConfiguration.DEFAULT_RM_PORT);
086      }
087    
088      @Override
089      public synchronized void init(Configuration conf) {
090        if (this.rmAddress == null) {
091          this.rmAddress = getRmAddress(conf);
092        }
093        super.init(conf);
094      }
095    
096      @Override
097      public synchronized void start() {
098        YarnRPC rpc = YarnRPC.create(getConfig());
099    
100        this.rmClient = (ClientRMProtocol) rpc.getProxy(
101            ClientRMProtocol.class, rmAddress, getConfig());
102        if (LOG.isDebugEnabled()) {
103          LOG.debug("Connecting to ResourceManager at " + rmAddress);
104        }
105        super.start();
106      }
107    
108      @Override
109      public synchronized void stop() {
110        if (this.rmClient != null) {
111          RPC.stopProxy(this.rmClient);
112        }
113        super.stop();
114      }
115    
116      @Override
117      public GetNewApplicationResponse getNewApplication()
118          throws YarnRemoteException {
119        GetNewApplicationRequest request =
120            Records.newRecord(GetNewApplicationRequest.class);
121        return rmClient.getNewApplication(request);
122      }
123    
124      @Override
125      public ApplicationId
126          submitApplication(ApplicationSubmissionContext appContext)
127              throws YarnRemoteException {
128        ApplicationId applicationId = appContext.getApplicationId();
129        appContext.setApplicationId(applicationId);
130        SubmitApplicationRequest request =
131            Records.newRecord(SubmitApplicationRequest.class);
132        request.setApplicationSubmissionContext(appContext);
133        rmClient.submitApplication(request);
134        LOG.info("Submitted application " + applicationId + " to ResourceManager"
135            + " at " + rmAddress);
136        return applicationId;
137      }
138    
139      @Override
140      public void killApplication(ApplicationId applicationId)
141          throws YarnRemoteException {
142        LOG.info("Killing application " + applicationId);
143        KillApplicationRequest request =
144            Records.newRecord(KillApplicationRequest.class);
145        request.setApplicationId(applicationId);
146        rmClient.forceKillApplication(request);
147      }
148    
149      @Override
150      public ApplicationReport getApplicationReport(ApplicationId appId)
151          throws YarnRemoteException {
152        GetApplicationReportRequest request =
153            Records.newRecord(GetApplicationReportRequest.class);
154        request.setApplicationId(appId);
155        GetApplicationReportResponse response =
156            rmClient.getApplicationReport(request);
157        return response.getApplicationReport();
158      }
159    
160      @Override
161      public List<ApplicationReport> getApplicationList()
162          throws YarnRemoteException {
163        GetAllApplicationsRequest request =
164            Records.newRecord(GetAllApplicationsRequest.class);
165        GetAllApplicationsResponse response = rmClient.getAllApplications(request);
166        return response.getApplicationList();
167      }
168    
169      @Override
170      public YarnClusterMetrics getYarnClusterMetrics() throws YarnRemoteException {
171        GetClusterMetricsRequest request =
172            Records.newRecord(GetClusterMetricsRequest.class);
173        GetClusterMetricsResponse response = rmClient.getClusterMetrics(request);
174        return response.getClusterMetrics();
175      }
176    
177      @Override
178      public List<NodeReport> getNodeReports() throws YarnRemoteException {
179        GetClusterNodesRequest request =
180            Records.newRecord(GetClusterNodesRequest.class);
181        GetClusterNodesResponse response = rmClient.getClusterNodes(request);
182        return response.getNodeReports();
183      }
184    
185      @Override
186      public DelegationToken getRMDelegationToken(Text renewer)
187          throws YarnRemoteException {
188        /* get the token from RM */
189        GetDelegationTokenRequest rmDTRequest =
190            Records.newRecord(GetDelegationTokenRequest.class);
191        rmDTRequest.setRenewer(renewer.toString());
192        GetDelegationTokenResponse response =
193            rmClient.getDelegationToken(rmDTRequest);
194        return response.getRMDelegationToken();
195      }
196    
197    
198      private GetQueueInfoRequest
199          getQueueInfoRequest(String queueName, boolean includeApplications,
200              boolean includeChildQueues, boolean recursive) {
201        GetQueueInfoRequest request = Records.newRecord(GetQueueInfoRequest.class);
202        request.setQueueName(queueName);
203        request.setIncludeApplications(includeApplications);
204        request.setIncludeChildQueues(includeChildQueues);
205        request.setRecursive(recursive);
206        return request;
207      }
208    
209      @Override
210      public QueueInfo getQueueInfo(String queueName) throws YarnRemoteException {
211        GetQueueInfoRequest request =
212            getQueueInfoRequest(queueName, true, false, false);
213        Records.newRecord(GetQueueInfoRequest.class);
214        return rmClient.getQueueInfo(request).getQueueInfo();
215      }
216    
217      @Override
218      public List<QueueUserACLInfo> getQueueAclsInfo() throws YarnRemoteException {
219        GetQueueUserAclsInfoRequest request =
220            Records.newRecord(GetQueueUserAclsInfoRequest.class);
221        return rmClient.getQueueUserAcls(request).getUserAclsInfoList();
222      }
223    
224      @Override
225      public List<QueueInfo> getAllQueues() throws YarnRemoteException {
226        List<QueueInfo> queues = new ArrayList<QueueInfo>();
227    
228        QueueInfo rootQueue =
229            rmClient.getQueueInfo(getQueueInfoRequest(ROOT, false, true, true))
230              .getQueueInfo();
231        getChildQueues(rootQueue, queues, true);
232        return queues;
233      }
234    
235      @Override
236      public List<QueueInfo> getRootQueueInfos() throws YarnRemoteException {
237        List<QueueInfo> queues = new ArrayList<QueueInfo>();
238    
239        QueueInfo rootQueue =
240            rmClient.getQueueInfo(getQueueInfoRequest(ROOT, false, true, true))
241              .getQueueInfo();
242        getChildQueues(rootQueue, queues, false);
243        return queues;
244      }
245    
246      @Override
247      public List<QueueInfo> getChildQueueInfos(String parent)
248          throws YarnRemoteException {
249        List<QueueInfo> queues = new ArrayList<QueueInfo>();
250    
251        QueueInfo parentQueue =
252            rmClient.getQueueInfo(getQueueInfoRequest(parent, false, true, false))
253              .getQueueInfo();
254        getChildQueues(parentQueue, queues, true);
255        return queues;
256      }
257    
258      private void getChildQueues(QueueInfo parent, List<QueueInfo> queues,
259          boolean recursive) {
260        List<QueueInfo> childQueues = parent.getChildQueues();
261    
262        for (QueueInfo child : childQueues) {
263          queues.add(child);
264          if (recursive) {
265            getChildQueues(child, queues, recursive);
266          }
267        }
268      }
269    }