ヒントとトリックを使用してデータを接続してPython


Databricks Connectは素晴らしいです.それはApacheのための開発を行う喜びをスパークします.それがなければ、私はおそらく何か効果的であり、長い時間前にスパークを放棄します.

なぜ


Databicks接続は、スパークの魔法のローカルインスタンスです.あなたのマシンは、それがスパークのローカルインストールを使用していると思うでしょうが、実際には、リモートデータベースのインスタンスを使用します.なぜあなたはそれが欲しいですか?多くの場合、お使いのマシンに適合しない大規模なデータセットへのアクセスを必要とするので、多くの場合、データを扱うために大きな計算クラスタが必要です.
伝統的に、データのいくつかの小さなサブセットをローカルにしてcrunchingすることで、それを達成することができます.次に、リモートデータセットを実行して別のデータセットで実行することができますが、それは常に可能です.

インストール


私は、あなたがすでにMiniconda(またはあなたがあなたのPCについて気にしないならば、アナコンダ)をインストールしたと仮定します、そして、あなたは公式Databricks Connect docsで基本的なステップに従うことができます.
私も、あなたがJupyterをインストールしたと仮定します.そうでない場合はConda - conda install -c conda-forge notebookからインストールしてください.私は、これをあなたのベース環境にインストールすることを勧めます.
あなたが既にいないならば、ダウンロードして、JDK 8をインストールしてください.スパークがJava 8の下で走っているように、より高いバージョンを使用しないでください.また、あなたがSDKのX 64バージョンをインストールしていることを確認してください.binPATHを加え、JAVA_HOME環境変数を作成します.Windowsでは、通常は次のようになります.
PATH=C:\Program Files (x86)\Java\jdk1.8.0_261\bin
JAVA_HOME=C:\Program Files (x86)\Java\jdk1.8.0_261
Pythonのバージョン3.7を使ってConda環境を作成してください.
conda create --name dbconnect python=3.7
環境を活性化する
conda activate dbconnect
ツールV 6をインストールします.6
pip install -U databricks-connect==6.6.*
あなたのクラスタは、データリックスが動作するように2つの変数を設定する必要があります.
  • 3.5spark.databricks.service.server.enabledに設定される必要がある
  • trueポートに設定する必要があります.

  • 必要に応じてIDEからデータブロック接続を使用する必要があります.

    ジュピターウェイネス


    私はJupyterが私のベースのコンダ環境に一度インストールされて、私が作成しているすべての環境で複製されないようにしたいです.この場合、Jupyterサーバーを実行すると、自動的に新しい環境を拾わないように、DataBlockks Connectの新しく作成された環境が表示されません.
    これを解決するには、spark.databricks.service.port(Jupyterカーネルインテグレーション)をデータブロック接続環境にインストールします.
    conda install ipykernel
    
    jupyterに、現在の環境をカーネルとして追加する必要があることを教えてください.
    python -m ipykernel install --user --name dbconnect --display-name "Databricks Connect (dbconnect)"
    
    Jupyterをインストールし、再び起動するベース環境に戻ります.
    conda activate base
    jupyter kernel
    
    カーネルがリストに表示されます.

    Jupyterヒント

    ipykernelを設定することで、自動DataFrame機能を有効にすることができます.
    Jupyter機能を拡張するためにインストールできます.
    Jupyterルック&フィールをカスタマイズするには、extensions availableを使用します.

    このリポジトリ 多重データクラスタクラスタの使用


    一旦クラスタIDを含むすべてのパラメタを指定することによって、Databricks Connectがコマンドラインから構成されるならば、それが再び再構成されない限り、あなたはそのクラスタに結び付けられます.別のクラスタを使用するには、新しいConda環境を作成し、再度設定できます.ただし、すべてのクラスタが同じデータワークのワークスペースにある場合は、次のトリックを使用してクラスタ間を切り替えることができます.
    まず、コードのどこかにクラスタマップを作成します.
    clusters = {
        "dev": {
            "id": "cluster id",
            "port": "port"
        },
        "prod": {
            "id": "cluster id",
            "port": "port"
        }
    }
    
    ノートブックから呼び出す関数を記述します.
    def use_cluster(cluster_name: str):
        """
        When running via Databricks Connect, specify to which cluster to connect instead of the default cluster.
        This call is ignored when running in Databricks environment.
        :param cluster_name: Name of the cluster as defined in the beginning of this file.
        """
        real_cluster_name = spark.conf.get("spark.databricks.clusterUsageTags.clusterName", None)
    
        # do not configure if we are already running in Databricks
        if not real_cluster_name:
            cluster_config = clusters.get(cluster_name)
            log.info(f"attaching to cluster '{cluster_name}' (id: {cluster_config['id']}, port: {cluster_config['port']})")
    
            spark.conf.set("spark.driver.host", "127.0.0.1")
            spark.conf.set("spark.databricks.service.clusterId", cluster_config["id"])
            spark.conf.set("spark.databricks.service.port", cluster_config["port"])
    
    
    マップからクラスタ名をspark.conf.set("spark.sql.repl.eagerEval.enabled", True)に渡すと、コードを実行する前に適切なクラスタが選択されます.それについての良いことは、彼らの環境で実行するときに無視されるように、Databksノートブックの呼び出しを残すことができます.

    ローカルvsリモート


    ノートブックがローカルまたはデータブロックで実行されているかどうかチェックする


    ここでのトリックは、データベースの特定の関数( DisplayHTMLのような)のいずれかがIPythonユーザー名前空間にあるかどうかを調べることです.
    def _check_is_databricks() -> bool:
        user_ns = ip.get_ipython().user_ns
        return "displayHTML" in user_ns
    

    スパークセッション


    Badricks Notesは自動的にuse_cluster変数を初期化します.したがって、あなたはそれを返すか、新しいローカルセッションを作成するかどうか決めることができます.
    def _get_spark() -> SparkSession:
        user_ns = ip.get_ipython().user_ns
        if "spark" in user_ns:
            return user_ns["spark"]
        else:
            spark = SparkSession.builder.getOrCreate()
            user_ns["spark"] = spark
            return spark
    

    偽造表示機能


    DataCramksはデータグラムをレンダリングする良いspark関数を持っています.私たちにはローカルにはありません.
    def _get_display() -> Callable[[DataFrame], None]:
        fn = ip.get_ipython().user_ns.get("display")
        return fn or _display_with_json
    

    dbutils


    実行する場所に応じて、display()のローカルインスタンスを作成するか、またはデータリックスで実行するときに事前に初期化されたインスタンスを作成できます.
    def _get_dbutils(spark: SparkSession):
        try:
            from pyspark.dbutils import DBUtils
            dbutils = DBUtils(spark)
        except ImportError:
            import IPython
            dbutils = IPython.get_ipython().user_ns.get("dbutils")
            if not dbutils:
                log.warning("could not initialise dbutils!")
        return dbutils
    

    すべてをまとめる


    上記の全てを単一のPythonファイルに置き、全てのノートブックから参照することができます.dbutils
    from typing import Any, Tuple, Callable
    
    from pyspark.sql import SparkSession, DataFrame
    import logging
    import IPython as ip
    from pyspark.sql.types import StructType, ArrayType
    import pyspark.sql.functions as f
    
    clusters = {
        "dev": {
            "id": "cluster id",
            "port": "port"
        },
        "prod": {
            "id": "cluster id",
            "port": "port"
        }
    }
    
    # Logging
    
    class SilenceFilter(logging.Filter):
        def filter(self, record: logging.LogRecord) -> int:
            return False
    
    
    logging.basicConfig(format="%(asctime)s|%(levelname)s|%(name)s|%(message)s", level=logging.INFO)
    logging.getLogger("py4j.java_gateway").addFilter(SilenceFilter())
    log = logging.getLogger("dbconnect")
    
    def _check_is_databricks() -> bool:
        user_ns = ip.get_ipython().user_ns
        return "displayHTML" in user_ns
    
    
    def _get_spark() -> SparkSession:
        user_ns = ip.get_ipython().user_ns
        if "spark" in user_ns:
            return user_ns["spark"]
        else:
            spark = SparkSession.builder.getOrCreate()
            user_ns["spark"] = spark
            return spark
    
    
    def _display(df: DataFrame) -> None:
        df.show(truncate=False)
    
    
    def _display_with_json(df: DataFrame) -> None:
        for column in df.schema:
            t = type(column.dataType)
            if t == StructType or t == ArrayType:
                df = df.withColumn(column.name, f.to_json(column.name))
        df.show(truncate=False)
    
    
    def _get_display() -> Callable[[DataFrame], None]:
        fn = ip.get_ipython().user_ns.get("display")
        return fn or _display_with_json
    
    
    def _get_dbutils(spark: SparkSession):
        try:
            from pyspark.dbutils import DBUtils
            dbutils = DBUtils(spark)
        except ImportError:
            import IPython
            dbutils = IPython.get_ipython().user_ns.get("dbutils")
            if not dbutils:
                log.warning("could not initialise dbutils!")
        return dbutils
    
    
    # initialise Spark variables
    is_databricks: bool = _check_is_databricks()
    spark: SparkSession = _get_spark()
    display = _get_display()
    dbutils = _get_dbutils(spark)
    
    
    def use_cluster(cluster_name: str):
        """
        When running via Databricks Connect, specify to which cluster to connect instead of the default cluster.
        This call is ignored when running in Databricks environment.
        :param cluster_name: Name of the cluster as defined in the beginning of this file.
        """
        real_cluster_name = spark.conf.get("spark.databricks.clusterUsageTags.clusterName", None)
    
        # do not configure if we are already running in Databricks
        if not real_cluster_name:
            cluster_config = clusters.get(cluster_name)
            log.info(f"attaching to cluster '{cluster_name}' (id: {cluster_config['id']}, port: {cluster_config['port']})")
    
            spark.conf.set("spark.driver.host", "127.0.0.1")
            spark.conf.set("spark.databricks.service.clusterId", cluster_config["id"])
            spark.conf.set("spark.databricks.service.port", cluster_config["port"])
    
    
    ノートブックで
    from dbconnect import spark, dbutils, use_cluster, display
    
    use_cluster("dev")
    
    # ...
    
    df = spark.table("....")    # use spark variable
    
    display(df) # display DataFrames
    
    # etc...
    

    メモリ問題の修正


    データブロック接続を使用する場合は、dbconnect.pyなどのエラーが発生する場合があります.これは単にローカルスパークノード(ドライバ)がメモリ不足になっていることを意味します.あなたがより多くのメモリを必要とする場合は、それを増やすのは簡単です.
    まず最初に、PySparkのホームディレクトリがどこにあるかを見つけてください.
    ❯ databricks-connect get-spark-home
    c:\users\ivang\miniconda3\envs\hospark\lib\site-packages\pyspark
    
    これはサブフォルダJava Heap Space etc. etc. etc.(存在しない場合は作成)を持つ必要があります.とファイルconf (再度、create ifが存在しない).フルファイルのパスはspark-defaults.confです.行を加える
    spark.driver.memory 8g
    
    ドライバはメモリを8ギガバイトにする.

    監視仕事


    残念ながら、私はDBC環境から仕事を監視する良い方法を見つけることができませんでした.Intellijのための プラグインがあり、理論的にはスパークジョブの監視をサポートしており、DBCは仮想的なローカルクラスタを実行すると考えています.しかし、どんな設定でも運がない.
    私が見つけたジョブを監視する最良の方法は、あなたが接続しているDatabricksのクラスタからスパークUIを使用することです.
    元来はBig Data Tools年である.