导入Pyspark Delta Lake模块时找不到模块错误 [英] Module not found error when importing Pyspark Delta Lake module

查看:130
本文介绍了导入Pyspark Delta Lake模块时找不到模块错误的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在用三角洲湖泊运行Pyspark,但是当我尝试导入三角洲模块时,我得到一个 ModuleNotFoundError:没有名为三角洲"的模块.这是在没有互联网连接的计算机上,因此我不得不从 Maven 并将其放置在%SPARK_HOME%/jars 文件夹中.

I'm running Pyspark with delta lake but when I try to import the delta module I get a ModuleNotFoundError: No module named 'delta'. This is on a machine without an internet connection so I had to download the delta-core jar manually from Maven and place it into the %SPARK_HOME%/jars folder.

我的程序可以正常工作,并且能够在三角洲湖中读写,所以我很高兴我拥有正确的jar.但是,当我尝试从delta.tables import * 导入增量模块时,我得到了错误.

My program works without any issues and I'm able to write and read from delta lake so I'm happy I've got the correct jar. But when I try and import the delta module from delta.tables import * I get the error.

有关信息,我的代码是:

For information my code is:

import os
from pyspark.sql import SparkSession
from pyspark.sql.types import TimestampType, FloatType, StructType, StructField
from pyspark.sql.functions import input_file_name
from Constants import Constants

if __name__ == "__main__":
    constants = Constants()
    spark = SparkSession.builder.master("local[*]")\
                                .appName("Delta Lake Testing")\
                                .getOrCreate()

    # have to start spark session before importing: https://docs.delta.io/latest/quick-start.html#python
    from delta.tables import *

    # set logging level to limit output
    spark.sparkContext.setLogLevel("ERROR")

    spark.conf.set("spark.sql.session.timeZone", "UTC")
    # push additional python files to the worker nodes
    base_path = os.path.abspath(os.path.dirname(__file__))
    spark.sparkContext.addPyFile(os.path.join(base_path, 'Constants.py'))

    # start pipeline
    schema = StructType([StructField("Timestamp", TimestampType(), False),\
                        StructField("ParamOne", FloatType(), False),\
                        StructField("ParamTwo", FloatType(), False),\
                        StructField("ParamThree", FloatType(), False)])

    df = spark.readStream\
               .option("header", "true")\
               .option("timestampFormat", "yyyy-MM-dd HH:mm:ss")\
               .schema(schema)\
               .csv(constants.input_path)\
               .withColumn("input_file_name", input_file_name())

     df.writeStream\
       .format("delta")\
       .outputMode("append")\
       .option("checkpointLocation", constants.checkpoint_location)\
       .start("/tmp/bronze")

    # await on stream
    sqm = spark.streams
    sqm.awaitAnyTermination()

这使用的是Spark v2.4.4和Python v3.6.1,作业是使用 spark-submit path/to/job.py

This is using Spark v2.4.4 and Python v3.6.1 and the job is submitted using spark-submit path/to/job.py

推荐答案

%pyspark
sc.addPyFile("**LOCATION_OF_DELTA_LAKE_JAR_FILE**")
from delta.tables import *

这篇关于导入Pyspark Delta Lake模块时找不到模块错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆