001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapreduce.lib.output;
020
021 import java.io.IOException;
022 import java.text.NumberFormat;
023
024 import org.apache.hadoop.classification.InterfaceAudience;
025 import org.apache.hadoop.classification.InterfaceStability;
026 import org.apache.hadoop.conf.Configuration;
027 import org.apache.hadoop.fs.FileSystem;
028 import org.apache.hadoop.fs.Path;
029 import org.apache.hadoop.io.compress.CompressionCodec;
030 import org.apache.hadoop.fs.FileAlreadyExistsException;
031 import org.apache.hadoop.mapred.InvalidJobConfException;
032 import org.apache.hadoop.mapreduce.Job;
033 import org.apache.hadoop.mapreduce.JobContext;
034 import org.apache.hadoop.mapreduce.OutputCommitter;
035 import org.apache.hadoop.mapreduce.OutputFormat;
036 import org.apache.hadoop.mapreduce.RecordWriter;
037 import org.apache.hadoop.mapreduce.TaskAttemptContext;
038 import org.apache.hadoop.mapreduce.TaskID;
039 import org.apache.hadoop.mapreduce.TaskInputOutputContext;
040 import org.apache.hadoop.mapreduce.security.TokenCache;
041
042 /** A base class for {@link OutputFormat}s that read from {@link FileSystem}s.*/
043 @InterfaceAudience.Public
044 @InterfaceStability.Stable
045 public abstract class FileOutputFormat<K, V> extends OutputFormat<K, V> {
046
047 /** Construct output file names so that, when an output directory listing is
048 * sorted lexicographically, positions correspond to output partitions.*/
049 private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
050 protected static final String BASE_OUTPUT_NAME = "mapreduce.output.basename";
051 protected static final String PART = "part";
052 static {
053 NUMBER_FORMAT.setMinimumIntegerDigits(5);
054 NUMBER_FORMAT.setGroupingUsed(false);
055 }
056 private FileOutputCommitter committer = null;
057 public static final String COMPRESS ="mapreduce.output.fileoutputformat.compress";
058 public static final String COMPRESS_CODEC =
059 "mapreduce.output.fileoutputformat.compress.codec";
060 public static final String COMPRESS_TYPE = "mapreduce.output.fileoutputformat.compress.type";
061 public static final String OUTDIR = "mapreduce.output.fileoutputformat.outputdir";
062
063 /**
064 * Set whether the output of the job is compressed.
065 * @param job the job to modify
066 * @param compress should the output of the job be compressed?
067 */
068 public static void setCompressOutput(Job job, boolean compress) {
069 job.getConfiguration().setBoolean(FileOutputFormat.COMPRESS, compress);
070 }
071
072 /**
073 * Is the job output compressed?
074 * @param job the Job to look in
075 * @return <code>true</code> if the job output should be compressed,
076 * <code>false</code> otherwise
077 */
078 public static boolean getCompressOutput(JobContext job) {
079 return job.getConfiguration().getBoolean(
080 FileOutputFormat.COMPRESS, false);
081 }
082
083 /**
084 * Set the {@link CompressionCodec} to be used to compress job outputs.
085 * @param job the job to modify
086 * @param codecClass the {@link CompressionCodec} to be used to
087 * compress the job outputs
088 */
089 public static void
090 setOutputCompressorClass(Job job,
091 Class<? extends CompressionCodec> codecClass) {
092 setCompressOutput(job, true);
093 job.getConfiguration().setClass(FileOutputFormat.COMPRESS_CODEC,
094 codecClass,
095 CompressionCodec.class);
096 }
097
098 /**
099 * Get the {@link CompressionCodec} for compressing the job outputs.
100 * @param job the {@link Job} to look in
101 * @param defaultValue the {@link CompressionCodec} to return if not set
102 * @return the {@link CompressionCodec} to be used to compress the
103 * job outputs
104 * @throws IllegalArgumentException if the class was specified, but not found
105 */
106 public static Class<? extends CompressionCodec>
107 getOutputCompressorClass(JobContext job,
108 Class<? extends CompressionCodec> defaultValue) {
109 Class<? extends CompressionCodec> codecClass = defaultValue;
110 Configuration conf = job.getConfiguration();
111 String name = conf.get(FileOutputFormat.COMPRESS_CODEC);
112 if (name != null) {
113 try {
114 codecClass =
115 conf.getClassByName(name).asSubclass(CompressionCodec.class);
116 } catch (ClassNotFoundException e) {
117 throw new IllegalArgumentException("Compression codec " + name +
118 " was not found.", e);
119 }
120 }
121 return codecClass;
122 }
123
124 public abstract RecordWriter<K, V>
125 getRecordWriter(TaskAttemptContext job
126 ) throws IOException, InterruptedException;
127
128 public void checkOutputSpecs(JobContext job
129 ) throws FileAlreadyExistsException, IOException{
130 // Ensure that the output directory is set and not already there
131 Path outDir = getOutputPath(job);
132 if (outDir == null) {
133 throw new InvalidJobConfException("Output directory not set.");
134 }
135
136 // get delegation token for outDir's file system
137 TokenCache.obtainTokensForNamenodes(job.getCredentials(),
138 new Path[] { outDir }, job.getConfiguration());
139
140 if (outDir.getFileSystem(job.getConfiguration()).exists(outDir)) {
141 throw new FileAlreadyExistsException("Output directory " + outDir +
142 " already exists");
143 }
144 }
145
146 /**
147 * Set the {@link Path} of the output directory for the map-reduce job.
148 *
149 * @param job The job to modify
150 * @param outputDir the {@link Path} of the output directory for
151 * the map-reduce job.
152 */
153 public static void setOutputPath(Job job, Path outputDir) throws IOException {
154 outputDir = outputDir.getFileSystem(job.getConfiguration()).makeQualified(
155 outputDir);
156 job.getConfiguration().set(FileOutputFormat.OUTDIR, outputDir.toString());
157 }
158
159 /**
160 * Get the {@link Path} to the output directory for the map-reduce job.
161 *
162 * @return the {@link Path} to the output directory for the map-reduce job.
163 * @see FileOutputFormat#getWorkOutputPath(TaskInputOutputContext)
164 */
165 public static Path getOutputPath(JobContext job) {
166 String name = job.getConfiguration().get(FileOutputFormat.OUTDIR);
167 return name == null ? null: new Path(name);
168 }
169
170 /**
171 * Get the {@link Path} to the task's temporary output directory
172 * for the map-reduce job
173 *
174 * <h4 id="SideEffectFiles">Tasks' Side-Effect Files</h4>
175 *
176 * <p>Some applications need to create/write-to side-files, which differ from
177 * the actual job-outputs.
178 *
179 * <p>In such cases there could be issues with 2 instances of the same TIP
180 * (running simultaneously e.g. speculative tasks) trying to open/write-to the
181 * same file (path) on HDFS. Hence the application-writer will have to pick
182 * unique names per task-attempt (e.g. using the attemptid, say
183 * <tt>attempt_200709221812_0001_m_000000_0</tt>), not just per TIP.</p>
184 *
185 * <p>To get around this the Map-Reduce framework helps the application-writer
186 * out by maintaining a special
187 * <tt>${mapreduce.output.fileoutputformat.outputdir}/_temporary/_${taskid}</tt>
188 * sub-directory for each task-attempt on HDFS where the output of the
189 * task-attempt goes. On successful completion of the task-attempt the files
190 * in the <tt>${mapreduce.output.fileoutputformat.outputdir}/_temporary/_${taskid}</tt> (only)
191 * are <i>promoted</i> to <tt>${mapreduce.output.fileoutputformat.outputdir}</tt>. Of course, the
192 * framework discards the sub-directory of unsuccessful task-attempts. This
193 * is completely transparent to the application.</p>
194 *
195 * <p>The application-writer can take advantage of this by creating any
196 * side-files required in a work directory during execution
197 * of his task i.e. via
198 * {@link #getWorkOutputPath(TaskInputOutputContext)}, and
199 * the framework will move them out similarly - thus she doesn't have to pick
200 * unique paths per task-attempt.</p>
201 *
202 * <p>The entire discussion holds true for maps of jobs with
203 * reducer=NONE (i.e. 0 reduces) since output of the map, in that case,
204 * goes directly to HDFS.</p>
205 *
206 * @return the {@link Path} to the task's temporary output directory
207 * for the map-reduce job.
208 */
209 public static Path getWorkOutputPath(TaskInputOutputContext<?,?,?,?> context
210 ) throws IOException,
211 InterruptedException {
212 FileOutputCommitter committer = (FileOutputCommitter)
213 context.getOutputCommitter();
214 return committer.getWorkPath();
215 }
216
217 /**
218 * Helper function to generate a {@link Path} for a file that is unique for
219 * the task within the job output directory.
220 *
221 * <p>The path can be used to create custom files from within the map and
222 * reduce tasks. The path name will be unique for each task. The path parent
223 * will be the job output directory.</p>ls
224 *
225 * <p>This method uses the {@link #getUniqueFile} method to make the file name
226 * unique for the task.</p>
227 *
228 * @param context the context for the task.
229 * @param name the name for the file.
230 * @param extension the extension for the file
231 * @return a unique path accross all tasks of the job.
232 */
233 public
234 static Path getPathForWorkFile(TaskInputOutputContext<?,?,?,?> context,
235 String name,
236 String extension
237 ) throws IOException, InterruptedException {
238 return new Path(getWorkOutputPath(context),
239 getUniqueFile(context, name, extension));
240 }
241
242 /**
243 * Generate a unique filename, based on the task id, name, and extension
244 * @param context the task that is calling this
245 * @param name the base filename
246 * @param extension the filename extension
247 * @return a string like $name-[mrsct]-$id$extension
248 */
249 public synchronized static String getUniqueFile(TaskAttemptContext context,
250 String name,
251 String extension) {
252 TaskID taskId = context.getTaskAttemptID().getTaskID();
253 int partition = taskId.getId();
254 StringBuilder result = new StringBuilder();
255 result.append(name);
256 result.append('-');
257 result.append(
258 TaskID.getRepresentingCharacter(taskId.getTaskType()));
259 result.append('-');
260 result.append(NUMBER_FORMAT.format(partition));
261 result.append(extension);
262 return result.toString();
263 }
264
265 /**
266 * Get the default path and filename for the output format.
267 * @param context the task context
268 * @param extension an extension to add to the filename
269 * @return a full path $output/_temporary/$taskid/part-[mr]-$id
270 * @throws IOException
271 */
272 public Path getDefaultWorkFile(TaskAttemptContext context,
273 String extension) throws IOException{
274 FileOutputCommitter committer =
275 (FileOutputCommitter) getOutputCommitter(context);
276 return new Path(committer.getWorkPath(), getUniqueFile(context,
277 getOutputName(context), extension));
278 }
279
280 /**
281 * Get the base output name for the output file.
282 */
283 protected static String getOutputName(JobContext job) {
284 return job.getConfiguration().get(BASE_OUTPUT_NAME, PART);
285 }
286
287 /**
288 * Set the base output name for output file to be created.
289 */
290 protected static void setOutputName(JobContext job, String name) {
291 job.getConfiguration().set(BASE_OUTPUT_NAME, name);
292 }
293
294 public synchronized
295 OutputCommitter getOutputCommitter(TaskAttemptContext context
296 ) throws IOException {
297 if (committer == null) {
298 Path output = getOutputPath(context);
299 committer = new FileOutputCommitter(output, context);
300 }
301 return committer;
302 }
303 }
304