Python学習ノート-Spark操作Hive


SparkでPythonをサポートするために、Apache SparkコミュニティはツールPySparkを発表しました.
PySparkのオンラインインストール:
pip install -U -i https://pypi.tuna.tsinghua.edu.cn/simple pyspark
PySparkオフラインインストール:
ダウンロード先:https://pypi.org/project/wheel/#files
wheel-0.33.6>python setup.py install
ダウンロード先:https://pypi.org/project/pyspark/#files
pyspark-2.4.4>python setup.py install
 
Windows環境関連ソフトウェアをインストールし、関連環境変数を構成する
JAVA_HOME C:\Program Files\Java\jdk1.8.0_101
HADOOP_HOME F:\develop\hadoop\hadoop-2.10.0
SCALA_HOME F:\develop\scala\scala-2.12.8
SPARK_HOME F:\develop\spark\spark-2.4.4-bin-hadoop2.7
関連binディレクトリをPATH環境変数に追加
 
Hiveプロファイルhive-site.xmlを$SPARK_にコピーHOME/confディレクトリの下
mysqlドライバjarパッケージを$SPARK_にコピーHOME/jarsディレクトリの下
 
例:
# -*- coding:utf-8 -*-

import sys
from pyspark import SparkConf
from pyspark.sql import SparkSession

default_encoding = 'utf-8'
if sys.getdefaultencoding() != default_encoding:
    reload(sys)
    sys.setdefaultencoding(default_encoding)


class BaseSparkSession(object):

    SPARK_APP_NAME = None
    SPARK_MASTER_URL = 'yarn'

    SPARK_EXECUTOR_CORES = 2
    SPARK_EXECUTOR_MEMORY = '2g'
    SPARK_EXECUTOR_INSTANCES = 2

    SPARK_YARN_QUEUE = None

    ENABLE_HIVE_SUPPORT = False

    def create_spark_session(self):
        spark_conf = SparkConf()
        spark_conf.setAll(
            (
                ("spark.app.name", self.SPARK_APP_NAME),
                ("spark.master", self.SPARK_MASTER_URL),
                ("spark.executor.cores", self.SPARK_EXECUTOR_CORES),
                ("spark.executor.memory", self.SPARK_EXECUTOR_MEMORY),
                ("spark.executor.instances", self.SPARK_EXECUTOR_INSTANCES),
                ("spark.yarn.queue", self.SPARK_YARN_QUEUE)

            )
        )
        if self.ENABLE_HIVE_SUPPORT:
            return SparkSession.builder.config(conf=spark_conf).enableHiveSupport().getOrCreate()
        else:
            return SparkSession.builder.config(conf=spark_conf).getOrCreate()
# -*- coding -*-

import os
import sys
from spark import BaseSparkSession
from pyspark.sql.functions import col

default_encoding = 'utf-8'
if sys.getdefaultencoding() != default_encoding:
    reload(sys)
    sys.setdefaultencoding(default_encoding)

os.environ['PYSPARK_PYTHON'] = 'F:\develop\python\Python27\python.exe'
os.environ['HADOOP_HOME'] = 'F:\develop\hadoop\hadoop-2.10.0'
os.environ['HADOOP_CONF_DIR'] = 'F:\develop\hadoop\hadoop-2.10.0-conf'
os.environ['SPARK_HOME'] = 'F:\develop\spark\spark-2.4.4-bin-hadoop2.7'


class SparkYarnHiveOp(BaseSparkSession):
    SPARK_APP_NAME = "SparkYarnHiveOp"
    SPARK_YARN_QUEUE = "queue3"
    ENABLE_HIVE_SUPPORT = True

    def __init__(self):
        self.spark_session = self.create_spark_session()


class SparkLocalOp(BaseSparkSession):
    SPARK_APP_NAME = "SparkLocalOp"
    SPARK_MASTER_URL = "local[*]"
    ENABLE_HIVE_SUPPORT = False

    def __init__(self):
        self.spark_session = self.create_spark_session()


class SparkLocalHiveOp(BaseSparkSession):
    SPARK_APP_NAME = "SparkLocalHiveOp"
    SPARK_MASTER_URL = "local[*]"
    ENABLE_HIVE_SUPPORT = True

    def __init__(self):
        self.spark_session = self.create_spark_session()


if __name__ == '__main__':
    spark_local_op = SparkLocalOp()
    data = spark_local_op.spark_session.range(0, 100).select(col("count").cast("double"))
    data.show()
    data.agg({'count': 'sum'}).show()

    spark_local_hive_op = SparkLocalHiveOp()
    spark_local_hive_op.spark_session.sql('use default')
    print spark_local_hive_op.spark_session.sql('select * from user')

    spark_yarn_hive_op = SparkYarnHiveOp()
    spark_yarn_hive_op.spark_session.sql('use default')
    print spark_yarn_hive_op.spark_session.sql('select * from user')

例外処理レコード:
SparkException: When running with master 'yarn' either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.
os.environ['PYSPARK_PYTHON'] = 'F:\develop\python\Python27\python.ext'
os.environ['HADOOP_HOME'] = 'F:\develop\hadoop\hadoop-2.10.0'
os.environ['HADOOP_CONF_DIR'] = 'F:\develop\hadoop\hadoop-2.10.0-conf'
os.environ['SPARK_HOME'] = 'F:\develop\spark\spark-2.4.4-bin-hadoop2.7'
 
ClassNotFoundException: com.sun.jersey.api.client.config.ClientConfig
F:develophadoophadoop-2.10.00sharehadoopyarnlibディレクトリ下の
jersey-core-1.9.jarとjersey-client-1.9.jarコピー先
F:\develop\spark\spark-2.4.4-bin-hadoop2.7/jarsディレクトリで、古いjarパッケージを削除