Pigに関する操作

25115 ワード

コマンドライン
Pigの常用命令:操作HFS
ls、cd、cat、mkdir、pwd、copyFromLocal(  )、copyToLocal(  )
sh:オペレーティングシステムのコマンドを呼び出します。
register、define ----->   pig       jar 
PigLatin文を使ってデータを分析します。
1、YarnのHigtoryServerを起動する必要があります。実行したすべてのタスクを記録します。
mr-jobhistory-daemon.sh start historyserver
URL:  http://ip:19888/jobhistory
2、よくあるPigLatin文(PigLatin文はSparkの演算子/方法と非常に似ています。)
(*)load          (bag)
(*)foreach        , bag     tuple    
(*)filter      、   where
(*)group by    
(*)order by    
(*)join        
(*)generate     
(*)unionintersect      
(*)  :dump    
   store       
注意:PigLatinは計算をトリガします。できないことがあります。Spark RDD演算子のようなものです。
Transformation  (  ) ----->       
Action  (  ) ------>    Spark   
3、PigLatin文を使う
7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30
(1)社員表empの作成
emp = load '/scott/emp.csv';
表示構造
describe emp;
Schema for emp unknown
(2)テーブルを作成し、schemaを指定する(構造)
emp = load '/scott/emp.csv' as(empno,ename,job,mgr,hiredate,sal,comm,deptno);
デフォルトの列の種類:bytearrayデフォルトのセパレータ: 最終版:
emp = load '/scott/emp.csv' using PigStorage(',') as(empno:int,ename:chararray,job:chararray,mgr:int,hiredate:chararray,sal:int,comm:int,deptno:int);
部門表を作成します
10,ACCOUNTING,NEW YORK
dept = load '/scott/dept.csv' using PigStorage(',') as(deptno:int,dname:chararray,loc:chararray);
(3)従業員の情報を調べる:従業員の名前と給料
SQL:  select empno,ename,sal from emp;
PL:   emp3 = foreach emp generate empno,ename,sal;    ---->       
dump emp3;
(4)従業員の情報を調べ、月給順に並べ替える
SQL:  select * from emp order by sal;
PL:   emp4 = order emp by sal;
dump emp4;
(5)グループ化:各部門の最高賃金を求める:部門番号部門の最高賃金
SQL:  select deptno,max(sal) from emp group by deptno;
PL:最初のステップ:最初のグループ化
emp51 = group emp by deptno;
emp 51のテーブル構造を表示します。
emp51: {group: int,
        emp: {(empno: int,ename: chararray,job: chararray,mgr: int,hiredate: chararray,sal: int,comm: int,deptno: int)}}
dump emp51;
データ
group         emp
(10,{(7934,MILLER,CLERK,7782,1982/1/23,1300,,10),
     (7839,KING,PRESIDENT,,1981/11/17,5000,,10),
 (7782,CLARK,MANAGER,7839,1981/6/9,2450,,10)})

(20,{(7876,ADAMS,CLERK,7788,1987/5/23,1100,,20),
     (7788,SCOTT,ANALYST,7566,1987/4/19,3000,,20),
 (7369,SMITH,CLERK,7902,1980/12/17,800,,20),
 (7566,JONES,MANAGER,7839,1981/4/2,2975,,20),
 (7902,FORD,ANALYST,7566,1981/12/3,3000,,20)})

(30,{(7844,TURNER,SALESMAN,7698,1981/9/8,1500,0,30),
     (7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30),
 (7698,BLAKE,MANAGER,7839,1981/5/1,2850,,30),
 (7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30),
 (7521,WARD,SALESMAN,7698,1981/2/22,1250,500,30),
 (7900,JAMES,CLERK,7698,1981/12/3,950,,30)})
第二ステップ:各グループ(各部門)の給与の最大値を求めます。
emp52 = foreach emp51 generate group,MAX(emp.sal)
(6)10番部門の従業員を問い合わせる
SQL:  select * from emp where deptno=10;
PL:   emp6 = filter emp by deptno==10;     :    
(7)複数表の照会:部門名、社員名
SQL:  select d.dname,e.ename from emp e,dept d where e.deptno=d.deptno;
PL:   emp71 = join dept by deptno,emp by deptno;
      emp72 = foreach emp71 generate dept::dname,emp::ename;
(8)集合演算:10と20番部門の従業員情報を問い合わせる
SQL:   select * from emp where deptno=10
       union
       select * from emp where deptno=20;
Oracleでは、任意の集合が集合演算に参加できますか?集合演算に参加する各集合は、列の数が同じであり、種類が一致している必要があります。
select deptno,job,sum(sal) from emp group by deptno,job
union 
select deptno,to_char(null),sum(sal) from emp group by deptno
union
select to_number(null),to_char(null),sum(sal) from emp;
PL:  emp10 = filter emp by deptno==10;
     emp20 = filter emp by deptno==20;
     emp1020 = union emp10,emp20;
(9)WordCountの実行
①      
mydata = load '/input/data.txt' as (line:chararray);

②           
words = foreach mydata generate flatten(TOKENIZE(line)) as word;

③         
grpd = group words by word; 

④           
cntd = foreach grpd generate group,COUNT(words); 

⑤      
dump cntd;  
注意:後ろの操作は前の操作に依存しています。SparkのRDD(依存関係)のようです。
Pigのカスタム関数
   jar
/root/training/pig-0.14.0/pig-0.14.0-core-h2.jar
/root/training/pig-0.14.0/lib
/root/training/pig-0.14.0/lib/h2
/root/training/hadoop-2.4.1/share/hadoop/common
/root/training/hadoop-2.4.1/share/hadoop/common/lib 
1、ユーザー定義の演算関数:従業員の給料によって、給料のレベルを判断する。
package pig;

import java.io.IOException;

import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple;

//       ,       
//    emp2 = foreach emp generate empno,ename,sal,    (sal)
// emp2 = foreach emp generate empno,ename,sal,pig.CheckSalaryGrade(sal);
public class CheckSalaryGrade extends EvalFunc<String>{

    @Override
    public String exec(Tuple tuple) throws IOException {
        //       
        //tuple      

        int sal = (int) tuple.get(0);
        if(sal <1000) return "Grade A";
        else if(sal>=1000 && sal<3000) return "Grade B";
        else return "Grade C";
    }
}
2、カスタムフィルタ関数:2000を超える従業員を調べます。
package pig;

import java.io.IOException;

import org.apache.pig.FilterFunc;
import org.apache.pig.data.Tuple;

//      2000   
//   emp3 = filter emp by     (sal)
//  emp3 = filter emp by pig.IsSalaryTooHigh(sal);
public class IsSalaryTooHigh extends FilterFunc {

    @Override
    public Boolean exec(Tuple tuple) throws IOException {
        //    
        int sal = (int) tuple.get(0);
        return sal>2000?true:false;
    }
}
3、カスタムのローディング関数(一番面倒くさい)はMRのjarパッケージも必要です。
package pig;

import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.pig.LoadFunc;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;

public class MyLoadFunction extends LoadFunc {

    //  HDFS    
    private RecordReader reader = null;

    @Override
    public InputFormat getInputFormat() throws IOException {
        //            :   
        return new TextInputFormat();
    }

    @Override
    public Tuple getNext() throws IOException {
        //  reader             
        //  : I love Beijing
        //    
        Tuple result = null;
        try{
            //       
            if(!this.reader.nextKeyValue()){
                //      
                return result;
            }

            //     
            String data = this.reader.getCurrentValue().toString();
            //    
            String[] words = data.split(" ");

            //     tuple
            result = TupleFactory.getInstance().newTuple();

            //           tuple,     tuple  bag ,    bag  result 
            //     
            DataBag bag = BagFactory.getInstance().newDefaultBag();
            for(String w:words){
                //           tuple
                Tuple aTuple  = TupleFactory.getInstance().newTuple();
                aTuple.append(w);  //     tuple

                //    tuple  bag
                bag.add(aTuple);
            }

            //    bag  result 
            result.append(bag);
        }catch(Exception ex){
            ex.printStackTrace();
        }

        return result;
    }

    @Override
    public void prepareToRead(RecordReader reader, PigSplit arg1) throws IOException {
        //reader  HDFS    
        this.reader = reader;
    }

    @Override
    public void setLocation(String path, Job job) throws IOException {
        //   HDFS   
        FileInputFormat.setInputPaths(job, new Path(path));
    }
}