MongoDBのHadoopドライブ紹介
4447 ワード
MongoDBのHadoopドライブ紹介-----------------
1.いくつかの概念HadoopはApacheオープンソースの分散計算フレームワークであり、分散ファイルシステムDFSと分散計算モデルMapReduceを含む.MongoDBはドキュメント向けの分散データベースであり、NoSqlの一種である.ここで説明するのはMongoDBのHadoop駆動である.ここではMongoDBをMapReduceの入力源とする.MapReduceの利点を活用してMongoDBのデータの処理と計算を行う.
2.MongoDBのHadoopドライバは現在のバージョンのHadoopドライバを駆動するかテストバージョンを駆動するか、まだ実際の生産環境に適用できません.
下のサイトからhttps://github.com/mongodb/mongo-hadoop最新のドライバパッケージにダウンロードします.以下に依存する説明を示します.は現在、最新のHadoop 0.20.203バージョン、またはCloudera CHD 3で を行うことを推奨しています. MongoDBのバージョンは1.8+ が望ましいまたMongoDBのjavaドライバは2.5.3+ でなければなりません
いくつかの特徴があります.はHadoopのInputとOutput適合層を提供し、データの読み込みと書き出し に読む.は、XMLプロファイルを使用して構成できるパラメータの大部分を構成できます.プロファイルでクエリーするフィールド、クエリー条件、ソートポリシーなどの を定義できます.
現在サポートされていない機能:現在、マルチシェードのソースデータ読み出し はサポートされていない.はまだデータのsplit操作 をサポートしていない.
3.コード分析
そのexamplesのWordCountを実行します.JAvaコード
4.ブロック化機構の簡単な説明ここでは、異なるshardに対するsplit動作は実現されていない.すなわち、異なるshard上に分布するデータに対しては、1つのMap動作しか生成されない.
ここで私は1つの分片の構想を提供して、興味があるのは討論することができます.
Collectionがブロック化されると、configデータベースが生成されます.このデータベースの下にchunksというテーブルがあり、各chunkにstart_が記録されています.rowとend_row,これらのchunkは異なるshard上に分布することができ,このCollectionを解析することによって各shard上のchunk情報を得ることができ,それによって各shard上のchunk情報を1つのInputSplitに組み合わせることができ,これがここのMongoInputSplitであり,これではMongoInputFormatというクラスのgetSplitsという方法を修正し,chunksテーブルの解析を加えるだけである.shardの情報を得ることで、マルチsplitのMap操作を実現することができ、異なるShardに対して、各MapはローカルのMongosエージェントサービスを呼び出し、モバイルデータではなくモバイルコンピューティングの目的を実現することができる.
これは私の考えにすぎません.興味のある友达は一緒に討論してもいいです.
私は具体的な実現を送ります.
5.参考*https://github.com/mongodb/mongo-hadoop
* http://www.mongodb.org/display/DOCS/Java+Language+Center
1.いくつかの概念HadoopはApacheオープンソースの分散計算フレームワークであり、分散ファイルシステムDFSと分散計算モデルMapReduceを含む.MongoDBはドキュメント向けの分散データベースであり、NoSqlの一種である.ここで説明するのはMongoDBのHadoop駆動である.ここではMongoDBをMapReduceの入力源とする.MapReduceの利点を活用してMongoDBのデータの処理と計算を行う.
2.MongoDBのHadoopドライバは現在のバージョンのHadoopドライバを駆動するかテストバージョンを駆動するか、まだ実際の生産環境に適用できません.
下のサイトからhttps://github.com/mongodb/mongo-hadoop最新のドライバパッケージにダウンロードします.以下に依存する説明を示します.
いくつかの特徴があります.
現在サポートされていない機能:
3.コード分析
そのexamplesのWordCountを実行します.JAvaコード
// MongoDB test in ,
/**
* test.in db.in.insert( { x : "eliot was here" } ) db.in.insert( { x :
* "eliot is here" } ) db.in.insert( { x : "who is here" } ) =
*/
public class WordCount {
private static final Log log = LogFactory.getLog( WordCount.class );
// Map
public static class TokenizerMapper extends Mapper<Object, BSONObject, Text, IntWritable> {
private final static IntWritable one = new IntWritable( 1 );
private final Text word = new Text();
public void map( Object key , BSONObject value , Context context ) throws IOException, InterruptedException{
System.out.println( "key: " + key );
System.out.println( "value: " + value );
//
final StringTokenizer itr = new StringTokenizer( value.get( "x" ).toString() );
while ( itr.hasMoreTokens() ) {
word.set( itr.nextToken() );
context.write( word, one ); // key , value 1
}
}
}
// Reduce ,
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private final IntWritable result = new IntWritable();
public void reduce( Text key , Iterable<IntWritable> values , Context context ) throws IOException, InterruptedException{
// , value
int sum = 0;
for ( final IntWritable val : values ) {
sum += val.get();
}
result.set( sum );
context.write( key, result ); // key ,value
}
}
public static void main( String[] args ) throws Exception{
final Configuration conf = new Configuration();
// MongoDB , MongoDB, 27017
MongoConfigUtil.setInputURI( conf, "mongodb://localhost/test.in" );
MongoConfigUtil.setOutputURI( conf, "mongodb://localhost/test.out" );
System.out.println( "Conf: " + conf );
final Job job = new Job( conf , "word count" );
job.setJarByClass( WordCount.class );
// Mapper,Reduce Combiner
job.setMapperClass( TokenizerMapper.class );
job.setCombinerClass( IntSumReducer.class );
job.setReducerClass( IntSumReducer.class );
// Mapper Reduce key/value
job.setOutputKeyClass( Text.class );
job.setOutputValueClass( IntWritable.class );
// InputFormat OutputFormat
job.setInputFormatClass( MongoInputFormat.class );
job.setOutputFormatClass( MongoOutputFormat.class );
System.exit( job.waitForCompletion( true ) ? 0 : 1 );
}
}
4.ブロック化機構の簡単な説明ここでは、異なるshardに対するsplit動作は実現されていない.すなわち、異なるshard上に分布するデータに対しては、1つのMap動作しか生成されない.
ここで私は1つの分片の構想を提供して、興味があるのは討論することができます.
Collectionがブロック化されると、configデータベースが生成されます.このデータベースの下にchunksというテーブルがあり、各chunkにstart_が記録されています.rowとend_row,これらのchunkは異なるshard上に分布することができ,このCollectionを解析することによって各shard上のchunk情報を得ることができ,それによって各shard上のchunk情報を1つのInputSplitに組み合わせることができ,これがここのMongoInputSplitであり,これではMongoInputFormatというクラスのgetSplitsという方法を修正し,chunksテーブルの解析を加えるだけである.shardの情報を得ることで、マルチsplitのMap操作を実現することができ、異なるShardに対して、各MapはローカルのMongosエージェントサービスを呼び出し、モバイルデータではなくモバイルコンピューティングの目的を実現することができる.
これは私の考えにすぎません.興味のある友达は一緒に討論してもいいです.
私は具体的な実現を送ります.
5.参考*https://github.com/mongodb/mongo-hadoop
* http://www.mongodb.org/display/DOCS/Java+Language+Center