使用 PySpark 结构化流将 Kafka 流接收到 MongoDB [英] Sink Kafka Stream to MongoDB using PySpark Structured Streaming
本文介绍了使用 PySpark 结构化流将 Kafka 流接收到 MongoDB的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我的火花:
spark = SparkSession\
.builder\
.appName("Demo")\
.master("local[3]")\
.config("spark.streaming.stopGracefullyonShutdown", "true")\
.config('spark.jars.packages','org.mongodb.spark:mongo-spark-connector_2.12:3.0.1')\
.getOrCreate()
Mongo URI:
input_uri_weld = 'mongodb://127.0.0.1:27017/db.coll1'
output_uri_weld = 'mongodb://127.0.0.1:27017/db.coll1'
将流批处理写入 Mongo 的函数:
Function for writing stream batches to Mongo:
def save_to_mongodb_collection(current_df, epoc_id, mongodb_collection_name):
current_df.write\
.format("com.mongodb.spark.sql.DefaultSource") \
.mode("append") \
.option("spark.mongodb.output.uri", output_uri_weld) \
.save()
卡夫卡流:
kafka_df = spark.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", kafka_broker)\
.option("subscribe", kafka_topic)\
.option("startingOffsets", "earliest")\
.load()
写信给 Mongo:
mongo_writer = df_parsed.write\
.format('com.mongodb.spark.sql.DefaultSource')\
.mode('append')\
.option("spark.mongodb.output.uri", output_uri_weld)\
.save()
&我的 spark.conf 文件:
& my spark.conf file:
spark.jars.packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1,org.apache.spark:spark-avro_2.12:3.0.1,com.datastax.spark:spark-cassandra-connector_2.12:3.0.0
错误:
java.lang.ClassNotFoundException: Failed to find data source: com.mongodb.spark.sql.DefaultSource. Please find packages at http://spark.apache.org/third-party-projects.html
推荐答案
我找到了解决方案.因为我找不到适合结构化流的 Mongo 驱动程序,所以我研究了另一个解决方案.现在,我使用与 mongoDb 的直接连接,并使用foreach(...)";而不是 foreachbatch(...).我的代码在 testSpark.py 文件中如下所示:
I found a solution. Since I couldn't find the right Mongo driver for Structured Streaming, I worked on another solution. Now, I use the direct connection to mongoDb, and use "foreach(...)" instead of foreachbatch(...). My code looks like this in testSpark.py file:
....
import pymongo
from pymongo import MongoClient
local_url = "mongodb://localhost:27017"
def write_machine_df_mongo(target_df):
cluster = MongoClient(local_url)
db = cluster["test_db"]
collection = db.test1
post = {
"machine_id": target_df.machine_id,
"proc_type": target_df.proc_type,
"sensor1_id": target_df.sensor1_id,
"sensor2_id": target_df.sensor2_id,
"time": target_df.time,
"sensor1_val": target_df.sensor1_val,
"sensor2_val": target_df.sensor2_val,
}
collection.insert_one(post)
machine_df.writeStream\
.outputMode("append")\
.foreach(write_machine_df_mongo)\
.start()
这篇关于使用 PySpark 结构化流将 Kafka 流接收到 MongoDB的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文