001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapreduce.security;
020    
021    import java.io.IOException;
022    import java.util.HashSet;
023    import java.util.Set;
024    
025    import org.apache.commons.logging.Log;
026    import org.apache.commons.logging.LogFactory;
027    import org.apache.hadoop.classification.InterfaceAudience;
028    import org.apache.hadoop.classification.InterfaceStability;
029    import org.apache.hadoop.conf.Configuration;
030    import org.apache.hadoop.fs.FileSystem;
031    import org.apache.hadoop.fs.Path;
032    import org.apache.hadoop.io.Text;
033    import org.apache.hadoop.mapred.JobConf;
034    import org.apache.hadoop.mapred.Master;
035    import org.apache.hadoop.mapreduce.MRJobConfig;
036    import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
037    import org.apache.hadoop.security.Credentials;
038    import org.apache.hadoop.security.UserGroupInformation;
039    import org.apache.hadoop.security.token.Token;
040    import org.apache.hadoop.security.token.TokenIdentifier;
041    
042    
043    /**
044     * This class provides user facing APIs for transferring secrets from
045     * the job client to the tasks.
046     * The secrets can be stored just before submission of jobs and read during
047     * the task execution.  
048     */
049    @InterfaceAudience.Public
050    @InterfaceStability.Evolving
051    public class TokenCache {
052      
053      private static final Log LOG = LogFactory.getLog(TokenCache.class);
054    
055      
056      /**
057       * auxiliary method to get user's secret keys..
058       * @param alias
059       * @return secret key from the storage
060       */
061      public static byte[] getSecretKey(Credentials credentials, Text alias) {
062        if(credentials == null)
063          return null;
064        return credentials.getSecretKey(alias);
065      }
066      
067      /**
068       * Convenience method to obtain delegation tokens from namenodes 
069       * corresponding to the paths passed.
070       * @param credentials
071       * @param ps array of paths
072       * @param conf configuration
073       * @throws IOException
074       */
075      public static void obtainTokensForNamenodes(Credentials credentials,
076          Path[] ps, Configuration conf) throws IOException {
077        if (!UserGroupInformation.isSecurityEnabled()) {
078          return;
079        }
080        obtainTokensForNamenodesInternal(credentials, ps, conf);
081      }
082    
083      /**
084       * Remove jobtoken referrals which don't make sense in the context
085       * of the task execution.
086       *
087       * @param conf
088       */
089      public static void cleanUpTokenReferral(Configuration conf) {
090        conf.unset(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY);
091      }
092    
093      static void obtainTokensForNamenodesInternal(Credentials credentials,
094          Path[] ps, Configuration conf) throws IOException {
095        Set<FileSystem> fsSet = new HashSet<FileSystem>();
096        for(Path p: ps) {
097          fsSet.add(p.getFileSystem(conf));
098        }
099        for (FileSystem fs : fsSet) {
100          obtainTokensForNamenodesInternal(fs, credentials, conf);
101        }
102      }
103    
104      /**
105       * get delegation token for a specific FS
106       * @param fs
107       * @param credentials
108       * @param p
109       * @param conf
110       * @throws IOException
111       */
112      static void obtainTokensForNamenodesInternal(FileSystem fs, 
113          Credentials credentials, Configuration conf) throws IOException {
114        String delegTokenRenewer = Master.getMasterPrincipal(conf);
115        if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
116          throw new IOException(
117              "Can't get Master Kerberos principal for use as renewer");
118        }
119        mergeBinaryTokens(credentials, conf);
120    
121        final Token<?> tokens[] = fs.addDelegationTokens(delegTokenRenewer,
122                                                         credentials);
123        if (tokens != null) {
124          for (Token<?> token : tokens) {
125            LOG.info("Got dt for " + fs.getUri() + "; "+token);
126          }
127        }
128      }
129    
130      private static void mergeBinaryTokens(Credentials creds, Configuration conf) {
131        String binaryTokenFilename =
132            conf.get(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY);
133        if (binaryTokenFilename != null) {
134          Credentials binary;
135          try {
136            binary = Credentials.readTokenStorageFile(
137                new Path("file:///" +  binaryTokenFilename), conf);
138          } catch (IOException e) {
139            throw new RuntimeException(e);
140          }
141          // supplement existing tokens with the tokens in the binary file
142          creds.mergeAll(binary);
143        }
144      }
145      
146      /**
147       * file name used on HDFS for generated job token
148       */
149      @InterfaceAudience.Private
150      public static final String JOB_TOKEN_HDFS_FILE = "jobToken";
151    
152      /**
153       * conf setting for job tokens cache file name
154       */
155      @InterfaceAudience.Private
156      public static final String JOB_TOKENS_FILENAME = "mapreduce.job.jobTokenFile";
157      private static final Text JOB_TOKEN = new Text("ShuffleAndJobToken");
158      
159      /**
160       * load job token from a file
161       * @param conf
162       * @throws IOException
163       */
164      @InterfaceAudience.Private
165      public static Credentials loadTokens(String jobTokenFile, JobConf conf) 
166      throws IOException {
167        Path localJobTokenFile = new Path ("file:///" + jobTokenFile);
168    
169        Credentials ts = Credentials.readTokenStorageFile(localJobTokenFile, conf);
170    
171        if(LOG.isDebugEnabled()) {
172          LOG.debug("Task: Loaded jobTokenFile from: "+
173              localJobTokenFile.toUri().getPath() 
174              +"; num of sec keys  = " + ts.numberOfSecretKeys() +
175              " Number of tokens " +  ts.numberOfTokens());
176        }
177        return ts;
178      }
179      /**
180       * store job token
181       * @param t
182       */
183      @InterfaceAudience.Private
184      public static void setJobToken(Token<? extends TokenIdentifier> t, 
185          Credentials credentials) {
186        credentials.addToken(JOB_TOKEN, t);
187      }
188      /**
189       * 
190       * @return job token
191       */
192      @SuppressWarnings("unchecked")
193      @InterfaceAudience.Private
194      public static Token<JobTokenIdentifier> getJobToken(Credentials credentials) {
195        return (Token<JobTokenIdentifier>) credentials.getToken(JOB_TOKEN);
196      }
197    }