从foreach内部调用时,Pyspark保存不起作用 [英] Pyspark saving is not working when called from inside a foreach

查看:159
本文介绍了从foreach内部调用时,Pyspark保存不起作用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在建立一个管道,该管道从Azure EventHub接收消息并保存到databricks增量表中.

I am building a pipeline that receives messages from Azure EventHub and save into databricks delta tables.

我使用静态数据进行的所有测试都运行良好,请参见下面的代码:

All my tests with static data went well, see the code below:

body = 'A|B|C|D\n"False"|"253435564"|"14"|"2019-06-25 04:56:21.713"\n"True"|"253435564"|"13"|"2019-06-25 04:56:21.713"\n"
tableLocation = "/delta/tables/myTableName"

spark = SparkSession.builder.appName("CSV converter").getOrCreate()    
csvData = spark.sparkContext.parallelize(body.split('\n'))

df = spark.read \
.option("header", True) \
.option("delimiter","|") \
.option("quote", "\"") \
.option("nullValue", "\\N") \
.option("inferShema", "true") \
.option("mergeSchema", "true") \
.csv(csvData)

df.write.format("delta").mode("append").save(tableLocation)

但是,就我而言,每个eventhub消息都是一个CSV字符串,它们可能来自许多来源.因此,每条消息都必须单独处理,因为每条消息最终可能保存在不同的增量表中.

However in my case, each eventhub message is a CSV string, and they may come from many sources. So each message must be processed separatelly, because each message may end up saved in different delta tables.

当我尝试在foreach语句中执行相同的代码时,它不起作用.日志中没有显示任何错误,我找不到任何保存的表.

When I try to execute this same code inside a foreach statement, It doesn't work. There are no errors shown at the logs, and I cant find any table saved.

所以也许在调用foreach时我做错了.请参见下面的代码:

So maybe I am doing something wrong when calling the foreach. See the code below:

def SaveData(row):
   ...
   The same code above

dfEventHubCSV.rdd.foreach(SaveData)

我试图在流上下文中执行此操作,但可悲的是我遇到了同样的问题.

I tried to do this on a streaming context, but I sadly went through the same problem.

foreach中的什么使它的行为有所不同?

What is in the foreach that makes it behave different?

在我正在运行的完整代码下面:

Below the full code I am running:

import pyspark.sql.types as t
from pyspark.sql import SQLContext

--row contains the fields Body and SdIds
--Body: CSV string
--SdIds: A string ID 
def SaveData(row):

  --Each row data that is going to be added to different tables
  rowInfo = GetDestinationTableData(row['SdIds']).collect()  
  table = rowInfo[0][4]
  schema = rowInfo[0][3]
  database = rowInfo[0][2]     
  body = row['Body']

  tableLocation = "/delta/" + database + '/' + schema + '/' + table
  checkpointLocation = "/delta/" + database + '/' + schema + "/_checkpoints/" + table

  spark = SparkSession.builder.appName("CSV").getOrCreate()    
  csvData = spark.sparkContext.parallelize(body.split('\n'))

  df = spark.read \
  .option("header", True) \
  .option("delimiter","|") \
  .option("quote", "\"") \
  .option("nullValue", "\\N") \
  .option("inferShema", "true") \
  .option("mergeSchema", "true") \
  .csv(csvData)

  df.write.format("delta").mode("append").save(tableLocation)

dfEventHubCSV.rdd.foreach(SaveData)

推荐答案

总之,一如既往,这很简单,但我看不出有任何问题.

Well, at the end of all, as always it is something very simple, but I dind't see this anywere.

基本上,当您执行foreach时,要保存的数据帧是在循环内部构建的.工作程序与驱动程序不同,不会在保存时自动设置"/dbfs/"路径,因此,如果您不手动添加"/dbfs/",它将在工作程序中本地保存数据,并且永远不会找到保存的数据.

Basically when you perform a foreach and the dataframe you want to save is built inside the loop. The worker unlike the driver, won't automatically setup the "/dbfs/" path on the saving, so if you don't manually add the "/dbfs/", it will save the data locally in the worker and you will never find the saved data.

这就是为什么我的循环无法正常工作的原因.

That is why my loops weren't working.

这篇关于从foreach内部调用时,Pyspark保存不起作用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆