将Spark Streaming PySpark数据帧写入Cassandra会覆盖表,而不是附加表 [英] Writing Spark streaming PySpark dataframe to Cassandra overwrites table instead of appending
问题描述
我正在运行一个由Kafka,Spark和Cassandra组成的1节点集群.所有本地都在同一台计算机上.
通过一个简单的Python脚本,我每5秒将一些伪数据流式传输到一个Kafka主题中.然后,使用Spark结构化流,将数据流(一次一行)读入PySpark DataFrame中,并带有 startingOffset
= latest
.最后,我试图将此行追加到已经存在的Cassandra表中.
我一直在关注(
如果该行总是用Cassandra重写,则表中的主键可能不正确-您需要确保每一行都具有唯一的主键钥匙.如果要从Spark创建Cassandra表,则默认情况下,它仅将第一列作为分区键,而且它本身可能不是唯一的.
提供架构后更新:
是的,这就是我所指的情况-您具有主键(分区,主题)
,但是您从该主题读取的特定分区中的每一行都将具有相同的值主键,因此它将覆盖以前的版本.您需要使主键具有唯一性-例如,将 offset
或 timestamp
列添加到主键中(尽管 timestamp
可能不是唯一的)如果您在同一毫秒内产生了数据).
P.S.另外,在连接器3.0.0中,您不需要 foreachBatch
:
df4.writeStream \.trigger(processingTime ="5秒")\.format("org.apache.spark.sql.cassandra")\.options(表="randintstream",键空间="kafkaspark")\.mode(更新")\.开始()
PPS如果只想将数据从Kafka移至Cassandra,则可以考虑使用How to write streaming Dataset to Cassandra?) and (Cassandra Sink for PySpark Structured Streaming from Kafka topic).
One row of data is being successfully written into the Cassandra table but my problem is it's being overwritten every time rather than appended to the end of the table. What might I be doing wrong?
Here's my code:
CQL DDL for creating kafkaspark
keyspace followed by randintstream
table in Cassandra:
DESCRIBE keyspaces;
CREATE KEYSPACE kafkaspark
WITH REPLICATION = {
'class' : 'SimpleStrategy',
'replication_factor' : 1
};
USE kafkaspark;
CREATE TABLE randIntStream (
key int,
value int,
topic text,
partition int,
offset bigint,
timestamp timestamp,
timestampType int,
PRIMARY KEY (partition, topic)
);
Launch PySpark shell
./bin/pyspark --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1,com.datastax.spark:spark-cassandra-connector_2.12:3.0.0 --conf spark.cassandra.connection.host=127.0.0.1,spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions
Read latest message from Kafka topic into streaming DataFrame:
df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("startingOffsets","latest").option("subscribe","topic1").load()
Some transformations and checking schema:
df2 = df.withColumn("key", df["key"].cast("string")).withColumn("value", df["value"].cast("string"))
df3 = df2.withColumn("key", df2["key"].cast("integer")).withColumn("value", df2["value"].cast("integer"))
df4 = df3.withColumnRenamed("timestampType","timestamptype")
df4.printSchema()
Function for writing to Cassandra:
def writeToCassandra(writeDF, epochId):
writeDF.write \
.format("org.apache.spark.sql.cassandra") \
.options(table="randintstream", keyspace="kafkaspark") \
.mode("append") \
.save()
Finally, query to write to Cassandra from Spark:
query = df4.writeStream \
.trigger(processingTime="5 seconds") \
.outputMode("update") \
.foreachBatch(writeToCassandra) \
.start()
SELECT *
on table in Cassandra:
If the row is always rewritten in Cassandra, then you may have incorrect primary key in the table - you need to make sure that every row will have an unique primary key. If you're creating Cassandra table from Spark, then by default it just takes first column as partition key, and it alone may not be unique.
Update after schema was provided:
Yes, that's the case that I was referring - you have a primary key of (partition, topic)
, but every row from specific partition that you read from that topic will have the same value for primary key, so it will overwrite previous versions. You need to make your primary key unique - for example, add the offset
or timestamp
columns to the primary key (although timestamp
may not be unique if you have data produced inside the same millisecond).
P.S. Also, in connector 3.0.0 you don't need foreachBatch
:
df4.writeStream \
.trigger(processingTime="5 seconds") \
.format("org.apache.spark.sql.cassandra") \
.options(table="randintstream", keyspace="kafkaspark") \
.mode("update") \
.start()
P.P.S if you just want to move data from Kafka into Cassandra, you may consider the use of the DataStax's Kafka Connector that could be much lightweight compared to the Spark.
这篇关于将Spark Streaming PySpark数据帧写入Cassandra会覆盖表,而不是附加表的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!