即使使用较旧的 spark 版本,也没有名为“pyspark.streaming.kafka"的模块 [英] No module named 'pyspark.streaming.kafka' even with older spark version
问题描述
在另一个类似的问题,他们提示安装较旧的 spark 2.4.5."
In another similar question, they hint 'install older spark 2.4.5.'
上面链接中的解决方案说安装 spark 2.4.5 并且它确实有 kafkautils.但问题是我无法下载 spark2.4.5 - 即使在存档中也不可用.
the solution from above link says 'install spark 2.4.5 and it does have kafkautils. But the problem is I can't download spark2.4.5 - not available even in the archive.
我遵循了建议,安装了旧版本的 spark - spark2.4.6(唯一可用的旧版本)并且还有 python37、kafka-python、pyspark 库.
i followed the advice, installed older version of spark - spark2.4.6(the only old available) and also have python37, kafka-python,pyspark libs.
我有需要使用 kafka 的 spark_job.py 文件
i have my spark_job.py file that needs to use kafka
from pyspark.streaming.kafka import KafkaUtils
点击python spark_job.py"时
when hitting 'python spark_job.py
ModuleNotFoundError: No module named 'pyspark.streaming.kafka'
错误仍然存在!
spark_job.py:
spark_job.py:
from __future__ import print_function
import sys
import os
import shutil
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.sql import Row, SparkSession
from pyspark.streaming.kafka import KafkaUtils # this is the problem
import json
outputPath = 'C:/Users/Admin/Desktop/kafka_project/checkpoint_01'
def getSparkSessionInstance(sparkConf):
if ('sparkSessionSingletonInstance' not in globals()):
globals()['sparkSessionSingletonInstance'] = SparkSession\
.builder\
.config(conf=sparkConf)\
.getOrCreate()
return globals()['sparkSessionSingletonInstance']
#-------------------------------------------------
# What I want to do per each RDD...
#-------------------------------------------------
def process(time, rdd):
print("===========-----> %s <-----===========" % str(time))
try:
spark = getSparkSessionInstance(rdd.context.getConf())
rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
currency=w['currency'],
amount=w['amount']))
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")
sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)
# Insert into DB
try:
testResultDataFrame.write \
.format("jdbc") \
.mode("append") \
.option("driver", 'org.postgresql.Driver') \
.option("url", "jdbc:postgresql://myhabrtest.cuyficqfa1h0.ap-south-1.rds.amazonaws.com:5432/habrDB") \
.option("dbtable", "transaction_flow") \
.option("user", "habr") \
.option("password", "habr12345") \
.save()
except Exception as e:
print("--> Opps! It seems an Errrorrr with DB working!", e)
except Exception as e:
print("--> Opps! Is seems an Error!!!", e)
#-------------------------------------------------
# General function
#-------------------------------------------------
def createContext():
sc = SparkContext(appName="PythonStreamingKafkaTransaction")
sc.setLogLevel("ERROR")
ssc = StreamingContext(sc, 2)
broker_list, topic = sys.argv[1:]
try:
directKafkaStream = KafkaUtils.createDirectStream(ssc,
[topic],
{"metadata.broker.list": broker_list})
except:
raise ConnectionError("Kafka error: Connection refused: \
broker_list={} topic={}".format(broker_list, topic))
parsed_lines = directKafkaStream.map(lambda v: json.loads(v[1]))
# RDD handling
parsed_lines.foreachRDD(process)
return ssc
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: spark_job.py <zk> <topic>", file=sys.stderr)
exit(-1)
print("--> Creating new context")
if os.path.exists(outputPath):
shutil.rmtree('outputPath')
ssc = StreamingContext.getOrCreate(outputPath, lambda: createContext())
ssc.start()
ssc.awaitTermination()
推荐答案
我刚刚使用 pip 将其降级:
i just downgraded it using pip:
pip install --force-reinstall pyspark==2.4.6
我没有使用任何诗歌.重新安装后,kafkaUtils pkg 被识别.
I did not use any poetry. AFter reinstalling, the kafkaUtils pkg was recognized.
这篇关于即使使用较旧的 spark 版本,也没有名为“pyspark.streaming.kafka"的模块的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!