PySpark呼び出しカスタムjarパッケージ
4754 ワード
PySpark
プログラムの開発には通常Javaのオブジェクトが必要であり、PySpark
自体もJava APIの上に構築され、Py 4 jによってJavaSparkContext
が作成される.ここで注意すべき点はいくつかあります
1.
Py4j
はdriver
でのみ動作つまり
worker
は現在、サードパーティのjarパッケージを導入できません.worker
ノードのPySparkはPy 4 jの通信プロセスを開始していないので、対応するjarパケットも自然にロードできません.これまでこの文書を詳しく見ていなかったが、システム設計時にworker
ノードでclientモード直結Hbaseを利用して一部のデータを取得し、テーブル全体のJOIN操作を回避しようとしたが、pythonにとってこのような操作はjarパッケージを導入することによってのみ実現される(thrift方式は考慮されない).しかし、テストのjarが書かれた後、ずっと成功せず、最後に案を修正してから公式文書を調べた.2.
PythonRDD
の原型はJavaRDD[String]
であるPythonRDDを介して伝達されたすべてのデータはBASE 64によって符号化される
3.
PySpark
の方法および匿名関数は、cloudpickle
によってシーケンス化されるなぜ関数がシーケンス化される必要があるのか、
map
またはflatMap
を行う場合、このときの関数またはlambda式は各worder
に伝達される必要があり、関数に閉パッケージがあればcloudpickle
も巧みにシーケンス化される.ただし,伝達が必要な関数にはself
のキーワードは使わないでください.伝達後,self
の指代関係は不明になっているからです.ドキュメントには
PythonRDD
のシーケンス化がカスタマイズ可能であることも記載されていますが、現在はこのニーズがなく、テストされていません.コードの例
JAvaテストコード、コンパイル生成
pyspark-test.jar
package org.valux.py4j;
public class Calculate {
public int sqAdd(int x){
return x * x + 1;
}
}
Pythonテストコード、ファイル
driver.py
に置くfrom pyspark import SparkContext
from py4j.java_gateway import java_import
sc = SparkContext(appName="Py4jTesting")
java_import(sc._jvm, "org.valux.py4j.Calculate")
func = sc._jvm.Calculate()
print func.sqAdd(5)
"""
[OUTPUT] > 26
"""
"""
!!![ ]
work ,
PySpark
"""
rdd = sc.parallelize([1, 2, 3])
result = rdd.map(func.sqAdd).collect()
"""
!!![ ]
work import jar
"""
def foo(x):
java_import(sc._jvm, "org.valux.py4j.Calculate")
func = sc._jvm.Calculate()
func.sqAdd(x)
rdd = sc.parallelize([1, 2, 3])
rdd.map(foo).collect()
, jar
> bin/spar-submit --driver-class-path pyspark-test.jar driver.py
ここにまた穴があります.前に提出して便利にするために、ずっと使っていたのは--jarsパラメータです.
--driver-class-path追加jarは
driver
でのみ導入されます--jars追加jarはすべてのworker
で導入されますヘルプドキュメントには
--jars Comma-separated list of local jars to include on the driver and executor classpaths.
すべてが怠け者になった--jars、結果はずっと次のような間違いを報告しています.
py4j.protocol.Py4JError: Trying to call a package.
テストしてやっと解決した
リファレンスドキュメント
https://cwiki.apache.org/confluence/display/SPARK/PySpark+Internals