001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.hadoop.mapred;
019    
020    import java.io.IOException;
021    
022    import org.apache.hadoop.classification.InterfaceAudience;
023    import org.apache.hadoop.classification.InterfaceStability;
024    import org.apache.hadoop.fs.FileSystem;
025    import org.apache.hadoop.fs.Path;
026    import org.apache.hadoop.io.BytesWritable;
027    import org.apache.hadoop.io.SequenceFile;
028    import org.apache.hadoop.io.SequenceFile.CompressionType;
029    import org.apache.hadoop.io.Writable;
030    import org.apache.hadoop.io.WritableComparable;
031    import org.apache.hadoop.io.compress.CompressionCodec;
032    import org.apache.hadoop.io.compress.DefaultCodec;
033    import org.apache.hadoop.util.Progressable;
034    import org.apache.hadoop.util.ReflectionUtils;
035    
036    /** 
037     * An {@link OutputFormat} that writes keys, values to 
038     * {@link SequenceFile}s in binary(raw) format
039     */
040    @InterfaceAudience.Public
041    @InterfaceStability.Stable
042    public class SequenceFileAsBinaryOutputFormat 
043     extends SequenceFileOutputFormat <BytesWritable,BytesWritable> {
044    
045      /** 
046       * Inner class used for appendRaw
047       */
048      static protected class WritableValueBytes extends org.apache.hadoop.mapreduce
049          .lib.output.SequenceFileAsBinaryOutputFormat.WritableValueBytes {
050      }
051    
052      /**
053       * Set the key class for the {@link SequenceFile}
054       * <p>This allows the user to specify the key class to be different 
055       * from the actual class ({@link BytesWritable}) used for writing </p>
056       * 
057       * @param conf the {@link JobConf} to modify
058       * @param theClass the SequenceFile output key class.
059       */
060      static public void setSequenceFileOutputKeyClass(JobConf conf, 
061                                                       Class<?> theClass) {
062        conf.setClass(org.apache.hadoop.mapreduce.lib.output.
063          SequenceFileAsBinaryOutputFormat.KEY_CLASS, theClass, Object.class);
064      }
065    
066      /**
067       * Set the value class for the {@link SequenceFile}
068       * <p>This allows the user to specify the value class to be different 
069       * from the actual class ({@link BytesWritable}) used for writing </p>
070       * 
071       * @param conf the {@link JobConf} to modify
072       * @param theClass the SequenceFile output key class.
073       */
074      static public void setSequenceFileOutputValueClass(JobConf conf, 
075                                                         Class<?> theClass) {
076        conf.setClass(org.apache.hadoop.mapreduce.lib.output.
077          SequenceFileAsBinaryOutputFormat.VALUE_CLASS, theClass, Object.class);
078      }
079    
080      /**
081       * Get the key class for the {@link SequenceFile}
082       * 
083       * @return the key class of the {@link SequenceFile}
084       */
085      static public Class<? extends WritableComparable> getSequenceFileOutputKeyClass(JobConf conf) { 
086        return conf.getClass(org.apache.hadoop.mapreduce.lib.output.
087          SequenceFileAsBinaryOutputFormat.KEY_CLASS, 
088          conf.getOutputKeyClass().asSubclass(WritableComparable.class),
089          WritableComparable.class);
090      }
091    
092      /**
093       * Get the value class for the {@link SequenceFile}
094       * 
095       * @return the value class of the {@link SequenceFile}
096       */
097      static public Class<? extends Writable> getSequenceFileOutputValueClass(JobConf conf) { 
098        return conf.getClass(org.apache.hadoop.mapreduce.lib.output.
099          SequenceFileAsBinaryOutputFormat.VALUE_CLASS, 
100          conf.getOutputValueClass().asSubclass(Writable.class), Writable.class);
101      }
102      
103      @Override 
104      public RecordWriter <BytesWritable, BytesWritable> 
105                 getRecordWriter(FileSystem ignored, JobConf job,
106                                 String name, Progressable progress)
107        throws IOException {
108        // get the path of the temporary output file 
109        Path file = FileOutputFormat.getTaskOutputPath(job, name);
110        
111        FileSystem fs = file.getFileSystem(job);
112        CompressionCodec codec = null;
113        CompressionType compressionType = CompressionType.NONE;
114        if (getCompressOutput(job)) {
115          // find the kind of compression to do
116          compressionType = getOutputCompressionType(job);
117    
118          // find the right codec
119          Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job,
120              DefaultCodec.class);
121          codec = ReflectionUtils.newInstance(codecClass, job);
122        }
123        final SequenceFile.Writer out = 
124          SequenceFile.createWriter(fs, job, file,
125                        getSequenceFileOutputKeyClass(job),
126                        getSequenceFileOutputValueClass(job),
127                        compressionType,
128                        codec,
129                        progress);
130    
131        return new RecordWriter<BytesWritable, BytesWritable>() {
132            
133            private WritableValueBytes wvaluebytes = new WritableValueBytes();
134    
135            public void write(BytesWritable bkey, BytesWritable bvalue)
136              throws IOException {
137    
138              wvaluebytes.reset(bvalue);
139              out.appendRaw(bkey.getBytes(), 0, bkey.getLength(), wvaluebytes);
140              wvaluebytes.reset(null);
141            }
142    
143            public void close(Reporter reporter) throws IOException { 
144              out.close();
145            }
146    
147          };
148    
149      }
150    
151      @Override 
152      public void checkOutputSpecs(FileSystem ignored, JobConf job) 
153                throws IOException {
154        super.checkOutputSpecs(ignored, job);
155        if (getCompressOutput(job) && 
156            getOutputCompressionType(job) == CompressionType.RECORD ){
157            throw new InvalidJobConfException("SequenceFileAsBinaryOutputFormat "
158                        + "doesn't support Record Compression" );
159        }
160    
161      }
162    
163    }