将Spark Streaming PySpark数据帧写入Cassandra会覆盖表,而不是附加表 [英] Writing Spark streaming PySpark dataframe to Cassandra overwrites table instead of appending

本文介绍了将Spark Streaming PySpark数据帧写入Cassandra会覆盖表,而不是附加表的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在运行一个由Kafka,Spark和Cassandra组成的1节点集群.所有本地都在同一台计算机上.

通过一个简单的Python脚本,我每5秒将一些伪数据流式传输到一个Kafka主题中.然后,使用Spark结构化流,将数据流(一次一行)读入PySpark DataFrame中,并带有 startingOffset = latest .最后,我试图将此行追加到已经存在的Cassandra表中.

我一直在关注(

解决方案

如果该行总是用Cassandra重写,则表中的主键可能不正确-您需要确保每一行都具有唯一的主键钥匙.如果要从Spark创建Cassandra表,则默认情况下,它仅将第一列作为分区键,而且它本身可能不是唯一的.

提供架构后更新:

是的,这就是我所指的情况-您具有主键(分区,主题),但是您从该主题读取的特定分区中的每一行都将具有相同的值主键,因此它将覆盖以前的版本.您需要使主键具有唯一性-例如,将 offset timestamp 列添加到主键中(尽管 timestamp 可能不是唯一的)如果您在同一毫秒内产生了数据).

P.S.另外,在连接器3.0.0中,您不需要 foreachBatch :

  df4.writeStream \.trigger(processingTime ="5秒")\.format("org.apache.spark.sql.cassandra")\.options(表="randintstream",键空间="kafkaspark")\.mode(更新")\.开始() 

PPS如果只想将数据从Kafka移至Cassandra,则可以考虑使用How to write streaming Dataset to Cassandra?) and (Cassandra Sink for PySpark Structured Streaming from Kafka topic).

One row of data is being successfully written into the Cassandra table but my problem is it's being overwritten every time rather than appended to the end of the table. What might I be doing wrong?

Here's my code:

CQL DDL for creating kafkaspark keyspace followed by randintstream table in Cassandra:

DESCRIBE keyspaces;

CREATE KEYSPACE kafkaspark
  WITH REPLICATION = { 
   'class' : 'SimpleStrategy', 
   'replication_factor' : 1 
  };
  
USE kafkaspark; 

CREATE TABLE randIntStream (
    key int,
    value int,
    topic text,
    partition int,
    offset bigint,
    timestamp timestamp,
    timestampType int,
    PRIMARY KEY (partition, topic)
);

Launch PySpark shell

./bin/pyspark --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1,com.datastax.spark:spark-cassandra-connector_2.12:3.0.0 --conf spark.cassandra.connection.host=127.0.0.1,spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions

Read latest message from Kafka topic into streaming DataFrame:

df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("startingOffsets","latest").option("subscribe","topic1").load()

Some transformations and checking schema:

df2 = df.withColumn("key", df["key"].cast("string")).withColumn("value", df["value"].cast("string"))
df3 = df2.withColumn("key", df2["key"].cast("integer")).withColumn("value", df2["value"].cast("integer"))
df4 = df3.withColumnRenamed("timestampType","timestamptype")
df4.printSchema()

Function for writing to Cassandra:

def writeToCassandra(writeDF, epochId):
    writeDF.write \
    .format("org.apache.spark.sql.cassandra") \
    .options(table="randintstream", keyspace="kafkaspark") \
    .mode("append") \
    .save()

Finally, query to write to Cassandra from Spark:

query = df4.writeStream \
.trigger(processingTime="5 seconds") \
.outputMode("update") \
.foreachBatch(writeToCassandra) \
.start()

SELECT * on table in Cassandra:

解决方案

If the row is always rewritten in Cassandra, then you may have incorrect primary key in the table - you need to make sure that every row will have an unique primary key. If you're creating Cassandra table from Spark, then by default it just takes first column as partition key, and it alone may not be unique.

Update after schema was provided:

Yes, that's the case that I was referring - you have a primary key of (partition, topic), but every row from specific partition that you read from that topic will have the same value for primary key, so it will overwrite previous versions. You need to make your primary key unique - for example, add the offset or timestamp columns to the primary key (although timestamp may not be unique if you have data produced inside the same millisecond).

P.S. Also, in connector 3.0.0 you don't need foreachBatch:

df4.writeStream \
  .trigger(processingTime="5 seconds") \
  .format("org.apache.spark.sql.cassandra") \
  .options(table="randintstream", keyspace="kafkaspark") \
  .mode("update") \
  .start()

P.P.S if you just want to move data from Kafka into Cassandra, you may consider the use of the DataStax's Kafka Connector that could be much lightweight compared to the Spark.

这篇关于将Spark Streaming PySpark数据帧写入Cassandra会覆盖表,而不是附加表的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆