PySpark呼び出しカスタムjarパッケージ

4754 ワード

PySparkプログラムの開発には通常Javaのオブジェクトが必要であり、PySpark自体もJava APIの上に構築され、Py 4 jによってJavaSparkContextが作成される.
ここで注意すべき点はいくつかあります
1.Py4jdriverでのみ動作
つまりworkerは現在、サードパーティのjarパッケージを導入できません.workerノードのPySparkはPy 4 jの通信プロセスを開始していないので、対応するjarパケットも自然にロードできません.これまでこの文書を詳しく見ていなかったが、システム設計時にworkerノードでclientモード直結Hbaseを利用して一部のデータを取得し、テーブル全体のJOIN操作を回避しようとしたが、pythonにとってこのような操作はjarパッケージを導入することによってのみ実現される(thrift方式は考慮されない).しかし、テストのjarが書かれた後、ずっと成功せず、最後に案を修正してから公式文書を調べた.
2.PythonRDDの原型はJavaRDD[String]である
PythonRDDを介して伝達されたすべてのデータはBASE 64によって符号化される
3.PySparkの方法および匿名関数は、cloudpickleによってシーケンス化される
なぜ関数がシーケンス化される必要があるのか、mapまたはflatMapを行う場合、このときの関数またはlambda式は各worderに伝達される必要があり、関数に閉パッケージがあればcloudpickleも巧みにシーケンス化される.ただし,伝達が必要な関数にはselfのキーワードは使わないでください.伝達後,selfの指代関係は不明になっているからです.
ドキュメントにはPythonRDDのシーケンス化がカスタマイズ可能であることも記載されていますが、現在はこのニーズがなく、テストされていません.
コードの例
JAvaテストコード、コンパイル生成pyspark-test.jar
package org.valux.py4j;

public class Calculate {

    public int sqAdd(int x){

        return x * x + 1;

    }

}

Pythonテストコード、ファイルdriver.pyに置く
from pyspark import SparkContext

from py4j.java_gateway import java_import



sc = SparkContext(appName="Py4jTesting")

java_import(sc._jvm, "org.valux.py4j.Calculate")

func = sc._jvm.Calculate()

print func.sqAdd(5)

"""

[OUTPUT] > 26

"""

 
"""

 !!![    ]

        work         ,

        PySpark       

"""

rdd = sc.parallelize([1, 2, 3])

result = rdd.map(func.sqAdd).collect()



"""

 !!![    ]

              work   import     jar    

"""

def foo(x):

    java_import(sc._jvm, "org.valux.py4j.Calculate")

    func = sc._jvm.Calculate()

    func.sqAdd(x)

rdd = sc.parallelize([1, 2, 3])

rdd.map(foo).collect()

 
   ,          jar 
> bin/spar-submit --driver-class-path pyspark-test.jar driver.py


ここにまた穴があります.前に提出して便利にするために、ずっと使っていたのは--jarsパラメータです.
--driver-class-path追加jarはdriverでのみ導入されます--jars追加jarはすべてのworkerで導入されます
ヘルプドキュメントには
--jars Comma-separated list of local jars to include on the driver and executor classpaths.
すべてが怠け者になった--jars、結果はずっと次のような間違いを報告しています.
py4j.protocol.Py4JError: Trying to call a package.
テストしてやっと解決した
リファレンスドキュメント
https://cwiki.apache.org/confluence/display/SPARK/PySpark+Internals