MongoDBのHadoopドライブ紹介

4447 ワード

MongoDBのHadoopドライブ紹介-----------------
1.いくつかの概念HadoopはApacheオープンソースの分散計算フレームワークであり、分散ファイルシステムDFSと分散計算モデルMapReduceを含む.MongoDBはドキュメント向けの分散データベースであり、NoSqlの一種である.ここで説明するのはMongoDBのHadoop駆動である.ここではMongoDBをMapReduceの入力源とする.MapReduceの利点を活用してMongoDBのデータの処理と計算を行う.
2.MongoDBのHadoopドライバは現在のバージョンのHadoopドライバを駆動するかテストバージョンを駆動するか、まだ実際の生産環境に適用できません.
下のサイトからhttps://github.com/mongodb/mongo-hadoop最新のドライバパッケージにダウンロードします.以下に依存する説明を示します.
  • は現在、最新のHadoop 0.20.203バージョン、またはCloudera CHD 3で
  • を行うことを推奨しています.
  • MongoDBのバージョンは1.8+
  • が望ましい
  • またMongoDBのjavaドライバは2.5.3+
  • でなければなりません
    いくつかの特徴があります.
  • はHadoopのInputとOutput適合層を提供し、データの読み込みと書き出し
  • に読む.
  • は、XMLプロファイルを使用して構成できるパラメータの大部分を構成できます.プロファイルでクエリーするフィールド、クエリー条件、ソートポリシーなどの
  • を定義できます.
    現在サポートされていない機能:
  • 現在、マルチシェードのソースデータ読み出し
  • はサポートされていない.
  • はまだデータのsplit操作
  • をサポートしていない.
    3.コード分析
    そのexamplesのWordCountを実行します.JAvaコード
    	//    MongoDB test    in         ,       
        /**
     * test.in db.in.insert( { x : "eliot was here" } ) db.in.insert( { x :
     * "eliot is here" } ) db.in.insert( { x : "who is here" } ) =
     */
    public class WordCount {
    
    
        private static final Log log = LogFactory.getLog( WordCount.class );
    
    
    	//     Map  
        public static class TokenizerMapper extends Mapper<Object, BSONObject, Text, IntWritable> {
    
    
            private final static IntWritable one = new IntWritable( 1 );
            private final Text word = new Text();
    
    
            public void map( Object key , BSONObject value , Context context ) throws IOException, InterruptedException{
    
    
                System.out.println( "key: " + key );
                System.out.println( "value: " + value );
    
    
    			//          
                final StringTokenizer itr = new StringTokenizer( value.get( "x" ).toString() );
                while ( itr.hasMoreTokens() ) {
                    word.set( itr.nextToken() );
                    context.write( word, one ); //    key  , value 1
                }
            }
        }
    
    
    	//   Reduce  ,          
        public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    
    
            private final IntWritable result = new IntWritable();
    
    
            public void reduce( Text key , Iterable<IntWritable> values , Context context ) throws IOException, InterruptedException{
    
    
    			//         ,     value  
                int sum = 0;
                for ( final IntWritable val : values ) {
                    sum += val.get();
                }
                result.set( sum );
                context.write( key, result ); // key    ,value           
            }
        }
    
    
        public static void main( String[] args ) throws Exception{
    
    
            final Configuration conf = new Configuration();
    		//   MongoDB           ,        MongoDB,      27017
            MongoConfigUtil.setInputURI( conf, "mongodb://localhost/test.in" );
            MongoConfigUtil.setOutputURI( conf, "mongodb://localhost/test.out" );
            System.out.println( "Conf: " + conf );
    
    
            final Job job = new Job( conf , "word count" );
    
    
            job.setJarByClass( WordCount.class );
    
    
    		//   Mapper,Reduce Combiner 
            job.setMapperClass( TokenizerMapper.class );
    
    
            job.setCombinerClass( IntSumReducer.class );
            job.setReducerClass( IntSumReducer.class );
    
    
    		//   Mapper Reduce   key/value   
            job.setOutputKeyClass( Text.class );
            job.setOutputValueClass( IntWritable.class );
    
    
    		//   InputFormat OutputFormat   
            job.setInputFormatClass( MongoInputFormat.class );
            job.setOutputFormatClass( MongoOutputFormat.class );
    
    
            System.exit( job.waitForCompletion( true ) ? 0 : 1 );
        }
    }	

    4.ブロック化機構の簡単な説明ここでは、異なるshardに対するsplit動作は実現されていない.すなわち、異なるshard上に分布するデータに対しては、1つのMap動作しか生成されない.
    ここで私は1つの分片の構想を提供して、興味があるのは討論することができます.
    Collectionがブロック化されると、configデータベースが生成されます.このデータベースの下にchunksというテーブルがあり、各chunkにstart_が記録されています.rowとend_row,これらのchunkは異なるshard上に分布することができ,このCollectionを解析することによって各shard上のchunk情報を得ることができ,それによって各shard上のchunk情報を1つのInputSplitに組み合わせることができ,これがここのMongoInputSplitであり,これではMongoInputFormatというクラスのgetSplitsという方法を修正し,chunksテーブルの解析を加えるだけである.shardの情報を得ることで、マルチsplitのMap操作を実現することができ、異なるShardに対して、各MapはローカルのMongosエージェントサービスを呼び出し、モバイルデータではなくモバイルコンピューティングの目的を実現することができる.
    これは私の考えにすぎません.興味のある友达は一緒に討論してもいいです.
    私は具体的な実現を送ります.
    5.参考*https://github.com/mongodb/mongo-hadoop
    * http://www.mongodb.org/display/DOCS/Java+Language+Center