从PySpark向Kafka写入大型DataFrame会超时 [英] Writing large DataFrame from PySpark to Kafka runs into timeout

查看:359
本文介绍了从PySpark向Kafka写入大型DataFrame会超时的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试向Kafka写入一个包含约2.3亿条记录的数据框.更具体地讲, Kafka-启用Azure Event Hub ,但是我不确定这是否真的是我的问题的根源.

I'm trying to write a data frame which has about 230 million records to a Kafka. More specifically to a Kafka-enable Azure Event Hub, but I'm not sure if that's actually the source of my issue.

EH_SASL = 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://myeventhub.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=****";'

dfKafka \
.write  \
.format("kafka") \
.option("kafka.sasl.mechanism", "PLAIN") \
.option("kafka.security.protocol", "SASL_SSL") \
.option("kafka.sasl.jaas.config", EH_SASL) \
.option("kafka.bootstrap.servers", "myeventhub.servicebus.windows.net:9093") \
.option("topic", "mytopic") \
.option("checkpointLocation", "/mnt/telemetry/cp.txt") \
.save()

这可以很好地启动,并成功(并且非常快)将大约3-4百万条记录写入队列.但是几分钟后,消息却停止了,工作如下:

This starts up fine and writes about 3-4 million records successfully (and pretty fast) to the queue. But then the job stops after a couple of minutes with messages like those:

org.apache.spark.SparkException:由于阶段失败而导致作业中止:阶段7.0中的任务6失败了4次,最近一次失败:阶段7.0中的任务6.3丢失(TID 248、10.139.64.5,执行者1):kafkashaded .org.apache.kafka.common.errors.TimeoutException:mytopic-18的61条记录已过期:自上次追加以来已过去32839 ms

org.apache.spark.SparkException: Job aborted due to stage failure: Task 6 in stage 7.0 failed 4 times, most recent failure: Lost task 6.3 in stage 7.0 (TID 248, 10.139.64.5, executor 1): kafkashaded.org.apache.kafka.common.errors.TimeoutException: Expiring 61 record(s) for mytopic-18: 32839 ms has passed since last append

org.apache.spark.SparkException:由于阶段失败而导致作业中止:阶段8.0中的任务13失败了4次,最近一次失败:阶段8.0中的任务13.3丢失(TID 348、10.139.64.5,执行者1):kafkashaded .org.apache.kafka.common.errors.TimeoutException:请求超时.

org.apache.spark.SparkException: Job aborted due to stage failure: Task 13 in stage 8.0 failed 4 times, most recent failure: Lost task 13.3 in stage 8.0 (TID 348, 10.139.64.5, executor 1): kafkashaded.org.apache.kafka.common.errors.TimeoutException: The request timed out.

此外,我再也看不到要创建/写入检查点文件. 我还尝试了.option("kafka.delivery.timeout.ms", 30000)和不同的值,但这似乎没有任何作用.

Also, I never see the checkpoint file being created/written to. I also played around with .option("kafka.delivery.timeout.ms", 30000) and different values but that didn't seem to have any effect.

我正在Azure Databricks群集版本5.0(包括Apache Spark 2.4.0,Scala 2.11)中运行此程序

I'm running this in an Azure Databricks cluster version 5.0 (includes Apache Spark 2.4.0, Scala 2.11)

我在Event Hub上没有看到任何节流之类的错误,所以应该没事.

I don't see any errors like throttling on my Event Hub, so that should be ok.

推荐答案

最后弄清楚了(大部分情况):

Finally figured it out (mostly):

结果表明,默认的批处理大小约为16000条消息,对于端点而言太大了.在将batch.size参数设置为5000之后,它开始工作,并以每分钟约70万条消息的速度写入事件中心.另外,上面的超时参数是错误的,只是被忽略了.是kafka.request.timeout.ms

Turns out the default batch size of about 16000 messages was too large for the endpoint. After I set the batch.size parameter to 5000, it worked and is writing at about 700k messages per minute to the Event Hub. Also, the timeout parameter above was wrong and was just being ignored. It is kafka.request.timeout.ms

唯一的问题是,它仍然随机地在超时中运行,并且显然是从头开始的,因此我最终会重复.是否将打开

Only issue is that randomly it still runs in timeouts and apparently starts from the beginning again so that I'm ending up with duplicates. Will open another question for that.

dfKafka \
.write  \
.format("kafka") \
.option("kafka.sasl.mechanism", "PLAIN") \
.option("kafka.security.protocol", "SASL_SSL") \
.option("kafka.sasl.jaas.config", EH_SASL) \
.option("kafka.batch.size", 5000) \
.option("kafka.bootstrap.servers", "myeventhub.servicebus.windows.net:9093") \
.option("kafka.request.timeout.ms", 120000) \
.option("topic", "raw") \
.option("checkpointLocation", "/mnt/telemetry/cp.txt") \
.save()

这篇关于从PySpark向Kafka写入大型DataFrame会超时的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆