Hadoop基本プロセスと応用開発

10615 ワード

著者岑文初は2008年8月13日午前1時54分に発表された.
コミュニティ
Java
テーマ
メッシュ計算、
クラスタとキャッシュ
タブ
Hadoop
——分布式計算オープンソースフレームワークHadoop入門実践(三)
Hadoop基本フロー
1つの画像が大きすぎて、2つの部分に分割するしかありません.フローチャートに基づいて、特定のタスクが実行される場合について説明します.
  • クライアントは、分散環境においてタスクを作成し、送信する.
  • InputFormatはMapの前処理を行い、主に以下の仕事を担当しています.
  • 入力フォーマットがJobConfigの入力定義に合っているかどうかを検証します.これはMapとConfの構築を実現するときにわかります.Writableの任意のサブクラスを定義しなくてもいいです.
  • inputのファイルを論理的な入力Input Splitに分割しますが、これは前述した分散ファイルシステムではblocksizeにサイズ制限があるため、大きなファイルは複数のblockに分割されます.
  • は、RecordReaderによってinputsplitをrecordsのセットとして再処理し、Mapに出力する.(inputsplitは論理的切り分けの第一歩にすぎないが、ファイル内の情報に基づいてどのように切り分けるかはRecordReaderで実現する必要があり、例えば最も簡単なデフォルトは折り返し折り返しの切り分けである)
  • .
  • RecordReader処理後の結果をMapの入力とし,Mapは定義されたMapロジックを実行し,出力処理後のkeyとvalueは一時中間ファイルに対応する.
  • Combinerは構成を選択することができ、主な役割は各Mapが分析を実行した後、ローカルで優先的にReduceの仕事をし、Reduceの過程でのデータ伝送量を減らすことである.
  • Partitionerは構成を選択できます.主な役割は、複数のReduceの場合、指定されたMapの結果は、あるReduceによって処理され、各Reduceには個別の出力ファイルがあります.(後述のコード例では使用シーンについて説明する)
  • Reduceは、特定のビジネスロジックを実行し、処理結果をOutputFormatに出力する.
  • OutputFormatの役割は、出力ディレクトリがすでに存在するかどうかを検証するとともに、出力結果タイプがConfigで構成されているかどうかを検証し、最後にReduceがまとめた結果を出力することです.

  • ビジネスシーンとコードの例
    ビジネスシーンの説明:入力と出力のパス(オペレーティングシステムのパスはHDFSではない)を設定することができ、あるアプリケーションがあるAPIにアクセスする総回数と総流量をアクセスログに基づいて分析し、統計した後にそれぞれ2つのファイルに出力する.ここではテストのために、多くのクラスを細分化しないで、すべてのクラスを1つのクラスにまとめて問題を説明しやすい.
    テストコードクラス図
    LogAnalysiserはプライマリクラスであり、主にタスクの作成、コミット、および一部の情報の出力を担当します.内部のいくつかのサブクラスの用途は、プロセスで説明されているロールロールを参照してください.具体的には、いくつかのクラスとメソッドのコード断片を見てみましょう.
    LogAnalysiser::MapClass     public static class MapClass extends MapReduceBase         implements Mapper     {         public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter)                 throws IOException         {                String line = value.toString();// RecordReader, line ,key ,value             if (line == null || line.equals(""))                 return;             String[] words = line.split(",");             if (words == null || words.length < 8)                 return;             String appid = words[1];             String apiName = words[2];             LongWritable recbytes = new LongWritable(Long.parseLong(words[7]));             Text record = new Text();             record.set(new StringBuffer("flow::").append(appid)                             .append("::").append(apiName).toString());             reporter.progress();             output.collect(record, recbytes);// , flow:: 。             record.clear();             record.set(new StringBuffer("count::").append(appid).append("::").append(apiName).toString());             output.collect(record, new LongWritable(1));// , count::         }        } LogAnalysiser::PartitionerClass     public static class PartitionerClass implements Partitioner     {         public int getPartition(Text key, LongWritable value, int numPartitions)         {             if (numPartitions >= 2)//Reduce , Reduce                 if (key.toString().startsWith("flow::"))                     return 0;                 else                     return 1;             else                 return 0;         }         public void configure(JobConf job){}    }LogAnalysiser::CombinerClass
    ReduceClassを参照すると、通常は2つを使用できますが、ここではいくつかの異なる処理が2つに分かれています.ReduceClassの青い行はCombinerClassには存在しないことを示します.
    LogAnalysiser::ReduceClass     public static class ReduceClass extends MapReduceBase         implements Reducer     {         public void reduce(Text key, Iterator values,                 OutputCollector output, Reporter reporter)throws IOException         {             Text newkey = new Text();             newkey.set(key.toString().substring(key.toString().indexOf("::")+2));             LongWritable result = new LongWritable();             long tmp = 0;             int counter = 0;             while(values.hasNext())// key             {                 tmp = tmp + values.next().get();                                 counter = counter +1;// ,JobTracker TaskTracker ,                 if (counter == 1000)                 {                     counter = 0;                     reporter.progress();                 }             }             result.set(tmp);             output.collect(newkey, result);//         }        }LogAnalysiser public static void main(String[] args) { try { run(args); } catch (Exception e) { e.printStackTrace(); } } public static void run(String[] args) throws Exception { if (args == null || args.length <2) { System.out.println("need inputpath and outputpath"); return; } String inputpath = args[0]; String outputpath = args[1]; String shortin = args[0]; String shortout = args[1]; if (shortin.indexOf(File.separator) >= 0) shortin = shortin.substring(shortin.lastIndexOf(File.separator)); if (shortout.indexOf(File.separator) >= 0) shortout = shortout.substring(shortout.lastIndexOf(File.separator)); SimpleDateFormat formater = new SimpleDateFormat("yyyy.MM.dd"); shortout = new StringBuffer(shortout).append("-") .append(formater.format(new Date())).toString(); if (!shortin.startsWith("/")) shortin = "/" + shortin; if (!shortout.startsWith("/")) shortout = "/" + shortout; shortin = "/user/root" + shortin; shortout = "/user/root" + shortout; File inputdir = new File(inputpath); File outputdir = new File(outputpath); if (!inputdir.exists() || !inputdir.isDirectory()) { System.out.println("inputpath not exist or isn't dir!"); return; } if (!outputdir.exists()) { new File(outputpath).mkdirs(); } JobConf conf = new JobConf(new Configuration(),LogAnalysiser.class);// Config FileSystem fileSys = FileSystem.get(conf); fileSys.copyFromLocalFile(new Path(inputpath), new Path(shortin));// HDFS conf.setJobName("analysisjob"); conf.setOutputKeyClass(Text.class);// key , OutputFormat conf.setOutputValueClass(LongWritable.class); // value , OutputFormat conf.setMapperClass(MapClass.class); conf.setCombinerClass(CombinerClass.class); conf.setReducerClass(ReduceClass.class); conf.setPartitionerClass(PartitionerClass.class); conf.set("mapred.reduce.tasks", "2");// Reduce FileInputFormat.setInputPaths(conf, shortin);//hdfs FileOutputFormat.setOutputPath(conf, new Path(shortout));//hdfs Date startTime = new Date(); System.out.println("Job started: " + startTime); JobClient.runJob(conf); Date end_time = new Date(); System.out.println("Job ended: " + end_time); System.out.println("The job took " + (end_time.getTime() - startTime.getTime()) /1000 + " seconds."); // fileSys.copyToLocalFile(new Path(shortout),new Path(outputpath)); fileSys.delete(new Path(shortin),true); fileSys.delete(new Path(shortout),true); } 以上のコードはすべての論理コードを完了し、ビジネスクラスを表示可能なコマンドとして登録し、hadoop jarが実行できるようにする登録駆動クラスが必要です.public class ExampleDriver {   public static void main(String argv[]){     ProgramDriver pgd = new ProgramDriver();     try {       pgd.addClass("analysislog", LogAnalysiser.class, "A map/reduce program that analysis log .");       pgd.driver(argv);     }     catch(Throwable e){       e.printStackTrace();     }   } }コードをjarにし、jarのmainClassをExampleDriverというクラスに設定します.分散環境の起動後に、次の文を実行します.hadoop jar analysiser.jar analysislog /home/wenchu/test-in /home/wenchu/test-out/home/wenchu/test-inでは解析が必要なログファイルであり、実行するとMapとReduceの進捗を含む実行プロセス全体が表示されます.実行が完了すると、/home/wenchu/test-outの下に出力の内容が表示されます.2つのファイルがあります.part-00000とpart-00001はそれぞれ統計後の結果を記録しています.実行の詳細を見る必要がある場合は、出力ディレクトリの下にある_logs/history/xxxx_analysisjobには、すべてのMap、Reduceの作成状況、実行状況が羅列されています.実行時には、ブラウザでMap、Reduceの状況を表示することもできます.http://MasterIP:50030/jobtracker.jsp
    Hadoopクラスタテスト
    まず、ここでは上記の例をテストとして使用し、構成の最適化もあまり行われていません.このテスト結果は、クラスタの効果とパラメータ構成の影響を見るためだけです.
    ファイルコピー数1,blocksize 5 M
    2
    95
    38
    2
    950
    337
    4
    95
    24
    4
    950
    178
    6
    95
    21
    6
    950
    114
    Blocksize 5M
    2(ファイルコピー数1)
    950
    337
    2(ファイルコピー数3)
    950
    339
    6(ファイルコピー数1)
    950
    114
    6(ファイルコピー数3)
    950
    117
    ファイルコピー数1
    6(blocksize 5M)
    95
    21
    6(blocksize 77M)
    95
    26
    4(blocksize 5M)
    950
    178
    4(blocksize 50M)
    950
    54
    6(blocksize 5M)
    950
    114
    6(blocksize 50M)
    950
    44
    6(blocksize 77M)
    950
    74
    テストのデータ結果は安定しており,基本的に何回か同じ条件下で同じである.テストの結果、次の点がわかります.
  • マシン数は性能に役立ちます(言わなかった^^).
  • ファイルのコピー数の増加は、セキュリティにのみ役立ちますが、パフォーマンスにはあまり役立ちません.また、現在はオペレーティングシステムファイルをHDFSにコピーしているため、バックアップが多く、準備時間が長い.
  • blocksizeはパフォーマンスに大きな影響を及ぼします.まずblockを小さく分割するとjobの数が増加し、コラボレーションのコストも増加し、パフォーマンスが低下しますが、構成が大きすぎるとjobが並列処理を最大化できません.したがって,この値の構成はデータ処理の量に応じて考慮する必要がある.
  • 最後にこの表に列挙された結果を除いて、出力ディレクトリの中の_をよく見るべきです.logs/historyのxxx_analysisjobというファイルには、すべての実行過程と読み書き状況が記録されています.これは、どこがもっと時間がかかるかをより明確に知ることができます.

  • 随想
    「クラウドコンピューティング」の熱い熱い手は、SAAS、Web 2、SNSなどと同じように、概念を作っていることが多い.実はあなたのデータ量がそんなに大きくないとき、この分布式計算はただのおもちゃで、本当に問題を解決する過程で、その深い問題が掘り起こされます.
    この3つの文章.(分布式計算オープンソースフレームワークHadoop紹介、Hadoopでのクラスタ配置と使用テクニック)分布式計算に興味のある友人にレンガを投げるためだけに、本当に金を掘るには、着実に使い、考え、分析する.あるいは自分もフレームワークでの実現メカニズムをさらに研究し、自分の問題を解決すると同時に、貢ぐことができる何かを献上する.
    先日、ある人がひざまずいてアーキテクチャになる方法を求めているのを見て、少し悲しくて、少しおかしいですが、実際にはアーキテクチャとは何か知っていますか?アーキテクチャの役割は何ですか.このような名号を追求するよりも、着実に石を作って水底に沈んだほうがいい.蓄積と沈殿の過程が成長であることを知っておく必要があります.
    関連情報:
  • 分布式計算オープンソースフレームワークHadoop紹介――分布式計算オープンソースフレームワークHadoop入門実践(一).
  • Hadoopのクラスタ構成と使用方法――分散計算オープンソースフレームワークHadoop入門実践(二).
  • 著者紹介:岑文初、アリソフトウェア会社の研究開発センタープラットフォームの一部に就職し、アーキテクチャ師を務めた.現在の主な仕事は阿里ソフトウェア開発プラットフォームサービスフレームワーク(ASF)の設計と実現、サービス集積プラットフォーム(SIP)の設計と実現に関連している.何も得意ではなく、精通していない.仕事が今まで唯一向上したのは学習能力とスピードである.個人Blogは:http://blog.csdn.net/cenwenchu79 .
    ボランティアでInfoQ中国語ステーションの内容の建設に参与して、メールを下さい[email protected] .InfoQ中国語ステーションのユーザーディスカッショングループへようこそ.