001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapred.lib.db;
020    
021    import java.io.IOException;
022    import java.sql.Connection;
023    import java.sql.SQLException;
024    import java.util.List;
025    
026    import org.apache.hadoop.classification.InterfaceAudience;
027    import org.apache.hadoop.classification.InterfaceStability;
028    import org.apache.hadoop.io.LongWritable;
029    import org.apache.hadoop.io.Writable;
030    import org.apache.hadoop.mapred.InputFormat;
031    import org.apache.hadoop.mapred.InputSplit;
032    import org.apache.hadoop.mapred.JobConf;
033    import org.apache.hadoop.mapred.JobConfigurable;
034    import org.apache.hadoop.mapred.RecordReader;
035    import org.apache.hadoop.mapred.Reporter;
036    import org.apache.hadoop.mapreduce.Job;
037    
038    @InterfaceAudience.Public
039    @InterfaceStability.Stable
040    public class DBInputFormat<T  extends DBWritable>
041        extends org.apache.hadoop.mapreduce.lib.db.DBInputFormat<T> 
042        implements InputFormat<LongWritable, T>, JobConfigurable {
043      /**
044       * A RecordReader that reads records from a SQL table.
045       * Emits LongWritables containing the record number as 
046       * key and DBWritables as value.  
047       */
048      protected class DBRecordReader extends
049          org.apache.hadoop.mapreduce.lib.db.DBRecordReader<T>
050          implements RecordReader<LongWritable, T> {
051        /**
052         * @param split The InputSplit to read data for
053         * @throws SQLException 
054         */
055        protected DBRecordReader(DBInputSplit split, Class<T> inputClass, 
056            JobConf job, Connection conn, DBConfiguration dbConfig, String cond,
057            String [] fields, String table) throws SQLException {
058          super(split, inputClass, job, conn, dbConfig, cond, fields, table);
059        }
060    
061        /** {@inheritDoc} */
062        public LongWritable createKey() {
063          return new LongWritable();  
064        }
065    
066        /** {@inheritDoc} */
067        public T createValue() {
068          return super.createValue();
069        }
070    
071        public long getPos() throws IOException {
072          return super.getPos();
073        }
074    
075        /** {@inheritDoc} */
076        public boolean next(LongWritable key, T value) throws IOException {
077          return super.next(key, value);
078        }
079      }
080    
081      /**
082       * A RecordReader implementation that just passes through to a wrapped
083       * RecordReader built with the new API.
084       */
085      private static class DBRecordReaderWrapper<T extends DBWritable>
086          implements RecordReader<LongWritable, T> {
087    
088        private org.apache.hadoop.mapreduce.lib.db.DBRecordReader<T> rr;
089        
090        public DBRecordReaderWrapper(
091            org.apache.hadoop.mapreduce.lib.db.DBRecordReader<T> inner) {
092          this.rr = inner;
093        }
094    
095        public void close() throws IOException {
096          rr.close();
097        }
098    
099        public LongWritable createKey() {
100          return new LongWritable();
101        }
102    
103        public T createValue() {
104          return rr.createValue();
105        }
106    
107        public float getProgress() throws IOException {
108          return rr.getProgress();
109        }
110        
111        public long getPos() throws IOException {
112          return rr.getPos();
113        }
114    
115        public boolean next(LongWritable key, T value) throws IOException {
116          return rr.next(key, value);
117        }
118      }
119    
120      /**
121       * A Class that does nothing, implementing DBWritable
122       */
123      public static class NullDBWritable extends 
124          org.apache.hadoop.mapreduce.lib.db.DBInputFormat.NullDBWritable 
125          implements DBWritable, Writable {
126      }
127      /**
128       * A InputSplit that spans a set of rows
129       */
130      protected static class DBInputSplit extends 
131          org.apache.hadoop.mapreduce.lib.db.DBInputFormat.DBInputSplit 
132          implements InputSplit {
133        /**
134         * Default Constructor
135         */
136        public DBInputSplit() {
137        }
138    
139        /**
140         * Convenience Constructor
141         * @param start the index of the first row to select
142         * @param end the index of the last row to select
143         */
144        public DBInputSplit(long start, long end) {
145          super(start, end);
146        }
147      }
148    
149      /** {@inheritDoc} */
150      public void configure(JobConf job) {
151        super.setConf(job);
152      }
153    
154      /** {@inheritDoc} */
155      @SuppressWarnings("unchecked")
156      public RecordReader<LongWritable, T> getRecordReader(InputSplit split,
157          JobConf job, Reporter reporter) throws IOException {
158    
159        // wrap the DBRR in a shim class to deal with API differences.
160        return new DBRecordReaderWrapper<T>(
161            (org.apache.hadoop.mapreduce.lib.db.DBRecordReader<T>) 
162            createDBRecordReader(
163              (org.apache.hadoop.mapreduce.lib.db.DBInputFormat.DBInputSplit) split, job));
164      }
165    
166      /** {@inheritDoc} */
167      public InputSplit[] getSplits(JobConf job, int chunks) throws IOException {
168        List<org.apache.hadoop.mapreduce.InputSplit> newSplits = 
169          super.getSplits(new Job(job));
170        InputSplit[] ret = new InputSplit[newSplits.size()];
171        int i = 0;
172        for (org.apache.hadoop.mapreduce.InputSplit s : newSplits) {
173          org.apache.hadoop.mapreduce.lib.db.DBInputFormat.DBInputSplit split = 
174            (org.apache.hadoop.mapreduce.lib.db.DBInputFormat.DBInputSplit)s;
175          ret[i++] = new DBInputSplit(split.getStart(), split.getEnd());
176        }
177        return ret;
178      }
179    
180      /**
181       * Initializes the map-part of the job with the appropriate input settings.
182       * 
183       * @param job The job
184       * @param inputClass the class object implementing DBWritable, which is the 
185       * Java object holding tuple fields.
186       * @param tableName The table to read data from
187       * @param conditions The condition which to select data with, eg. '(updated >
188       * 20070101 AND length > 0)'
189       * @param orderBy the fieldNames in the orderBy clause.
190       * @param fieldNames The field names in the table
191       * @see #setInput(JobConf, Class, String, String)
192       */
193      public static void setInput(JobConf job, Class<? extends DBWritable> inputClass,
194          String tableName,String conditions, String orderBy, String... fieldNames) {
195        job.setInputFormat(DBInputFormat.class);
196    
197        DBConfiguration dbConf = new DBConfiguration(job);
198        dbConf.setInputClass(inputClass);
199        dbConf.setInputTableName(tableName);
200        dbConf.setInputFieldNames(fieldNames);
201        dbConf.setInputConditions(conditions);
202        dbConf.setInputOrderBy(orderBy);
203      }
204      
205      /**
206       * Initializes the map-part of the job with the appropriate input settings.
207       * 
208       * @param job The job
209       * @param inputClass the class object implementing DBWritable, which is the 
210       * Java object holding tuple fields.
211       * @param inputQuery the input query to select fields. Example : 
212       * "SELECT f1, f2, f3 FROM Mytable ORDER BY f1"
213       * @param inputCountQuery the input query that returns the number of records in
214       * the table. 
215       * Example : "SELECT COUNT(f1) FROM Mytable"
216       * @see #setInput(JobConf, Class, String, String, String, String...)
217       */
218      public static void setInput(JobConf job, Class<? extends DBWritable> inputClass,
219          String inputQuery, String inputCountQuery) {
220        job.setInputFormat(DBInputFormat.class);
221        
222        DBConfiguration dbConf = new DBConfiguration(job);
223        dbConf.setInputClass(inputClass);
224        dbConf.setInputQuery(inputQuery);
225        dbConf.setInputCountQuery(inputCountQuery);
226        
227      }
228    }