DBInputFormat


コードはテストをしていないので、先に記録します.
package com.test;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.URI;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
 *  
 * 1、 mysql jdbc taskTracker lib , 
 *
 */
public class WordCountDB extends Configured implements Tool {
 
 private String OUT_PATH = "hdfs://grid131:9000/output";
 
 public static class Map extends Mapper<LongWritable, MyUser, LongWritable, Text> {
  public void map(LongWritable key, MyUser value, Context context) throws IOException, InterruptedException {
   context.write(key, new Text(value.toString()));
  }
 }
 
 public int run(String[] args) throws Exception {
  Configuration conf = this.getConf();
  DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", "jdbc:mysql://grid131:3306/test", "root", "admin");
  
  // , 
  FileSystem fs = FileSystem.get(new URI(OUT_PATH), conf);
  fs.delete(new Path(OUT_PATH), true);
  
  Job job = new Job(conf, WordCountDB.class.getSimpleName());
  job.setJarByClass(WordCountDB.class);
  
  FileOutputFormat.setOutputPath(job, new Path(args[1]));
  
  // reduce, map hdfs 
  job.setNumReduceTasks(0);
  job.setInputFormatClass(DBInputFormat.class);
  
  // 、 
  //DBInputFormat.setInput(job, inputClass, tableName, conditions, orderBy, fieldNames)
  DBInputFormat.setInput(job, MyUser.class, "myuser", null, null, "id", "name");
  job.setMapperClass(Map.class);
  
  // reduce map ,map 
  job.setMapOutputKeyClass(LongWritable.class);
  job.setMapOutputValueClass(Text.class);
  
  job.waitForCompletion(true);
  
  return job.isSuccessful()?0:1;
 }
 
 public static void main(String[] args) throws Exception {
  int exit = ToolRunner.run(new WordCount(), args);
  System.exit(exit);
 }
}
class MyUser implements Writable, DBWritable {
 private Long id;
 private String name;
 
 public Long getId() {
  return id;
 }
 public void setId(Long id) {
  this.id = id;
 }
 public String getName() {
  return name;
 }
 public void setName(String name) {
  this.name = name;
 }
 
 @Override
 public void write(DataOutput out) throws IOException {
  out.writeLong(this.id);
  Text.writeString(out, this.name);
 }
 
 @Override
 public void readFields(DataInput in) throws IOException {
  this.id = in.readLong();
  this.name = Text.readString(in);
 }
 
 @Override
 public void write(PreparedStatement statement) throws SQLException {
  statement.setLong(1, this.id);
  statement.setString(2, this.name);
 }
 
 @Override
 public void readFields(ResultSet resultSet) throws SQLException {
  this.id = resultSet.getLong(1);
  this.name = resultSet.getString(2);
 }
 
 @Override
 public String toString() {
  return this.id + "\t" + this.name;
 }
}