delta-lake相关内容

将运行时7.3LTS(Spark3.0.1)升级到9.1LTS(Spark3.1.2)后创建PySpark DataFrame数据库时,json文件中的重复列引发错误

问题陈述:升级数据库运行时版本时,创建数据帧时出现重复列引发错误。在较低的运行时中,将创建数据帧,并且由于下游不需要重复列,因此在SELECT中简单地将其排除。 文件位置:存储在ADLS Gen2(Azure)上的JSON文件。 集群模式:标准 代码: 我们在Azure数据库中阅读了它,如下所示。 intermediate_df = spark.read.option("multil ..
发布时间:2022-04-11 15:26:55 其他开发

如何将内联数据集(增量)的参数化链接服务的参数传递到数据流?

我在数据流中有一个增量数据源。为了连接到它,我需要使用参数化的链接服务;但是,我找不到哪里可以为链接的服务参数值寻址: 参数在下面的屏幕截图中突出显示: 在数据流中,我看不到任何提示来解决我的参数值: 谢谢:) 推荐答案 当您创建Delta Lake链接服务时,它会显示为您没有提供参数! 以下是我认为有意义的:Delta格式仅作为内联数据集可用。因为Delta La ..
发布时间:2022-04-04 18:23:48 其他开发

如何在 Zeppelin notebook 和 pyspark 中导入 Delta Lake 模块?

我正在尝试在带有 pyspark 的 Zeppelin 笔记本中使用 Delta Lake,但似乎无法成功导入模块.例如 %pyspark从 delta.tables 导入 * 失败并出现以下错误: ModuleNotFoundError: 没有名为“delta"的模块 但是,使用delta格式保存/读取数据帧是没有问题的.如果使用scala spark %spark 可以成功加 ..
发布时间:2021-11-14 23:53:12 其他开发

是否可以删除底层镶木地板文件而不会对 DeltaLake _delta_log 产生负面影响

在 DeltaLake 表上使用 .vacuum() 非常慢(参见 Delta Lake (OSS) 表在 EMR 和 S3 上 - 真空需要很长时间没有工作). 如果我手动删除了底层的 parquet 文件并且没有添加新的 json 日志文件或添加新的 .checkpoint.parquet 文件并更改 _delta_log/_last_checkpoint 指向它的文件;如果有的话,对 ..
发布时间:2021-11-14 23:13:27 其他开发

如何从 Databricks Delta 表中删除一列?

我最近开始发现 Databricks 并面临需要删除增量表的某个列的情况.当我使用 PostgreSQL 时,它就像 一样简单 ALTER TABLE main.metrics_table删除列度量_1; 我正在浏览 Databricks 文档在 DELETE 上,但它仅涵盖删除与谓词匹配的行. 我还找到了关于 DROP 数据库、DROP 函数和 DROP 表的文档,但绝对没有关于如何 ..
发布时间:2021-11-14 22:38:23 其他开发

从Kafka主题读取文件路径,然后在结构化流中读取文件并写入DeltaLake

我有一个用例,其中存储在 s3 中的 json 记录的文件路径作为 kafka卡夫卡中的消息.我必须使用 Spark 结构化流处理数据. 我想到的设计如下: 在 kafka Spark 结构化流中,读取包含数据路径的消息. 在驱动程序中收集消息记录.(消息很小) 从数据位置创建数据框. kafkaDf.select($"value".cast(StringType)).wri ..

三角洲湖回滚

需要一种优雅的方式将Delta Lake回滚到以前的版本. 下面列出了我目前的方法: import io.delta.tables._val deltaTable = DeltaTable.forPath(spark,testFolder)spark.read.format("delta").option("versionAsOf",0).load(testFolder).写.mode( ..
发布时间:2021-04-28 20:43:49 其他开发

使用自动加载器从AWS S3加载到Azure Datalake时的增量表事务保证

尝试使用自动加载器,其中AWS S3是源,而Delta Lake在Azure Datalake Gen中.当我尝试读取文件时,出现以下错误 从非AWS写入AWS上的Delta表在提供交易担保方面是不安全的.如果可以保证没有其他人会同时修改同一个Delta表,则可以通过设置SparkConf来关闭此检查:启动集群时为false. 尝试在集群级别设置设置,并且工作正常.我的问题是,有什么方 ..

我收到错误消息“无法将Delta表的时间旅行到版本X".而查看Azure Databricks的历史记录时可以看到版本X

我在三角洲湖中有一张桌子,这些桌子具有以下tblproperties: 但是当我尝试使用这样的命令访问它时: spark.read.format("delta").option("versionAsOf",322).load(path) 我收到此错误: AnalysisException:无法将Delta表的时间计时到版本322.可用版本:[330,341]. 我不明白这个问 ..
发布时间:2021-04-13 19:01:00 其他开发

可以删除基础实木复合地板文件而不会对DeltaLake _delta_log产生负面影响

在DeltaLake表上使用 .vacuum()非常慢(请参阅 Delta Lake(OSS)表在EMR和S3上-真空需要很长时间,没有任何工作). 如果我手动删除了底层实木复合地板文件,并且未添加新的 json 日志文件或未添加新的 .checkpoint.parquet 文件并更改了 _delta_log/_last_checkpoint 指向它的文件;如果有的话,对DeltaLake表 ..
发布时间:2021-04-08 20:30:35 其他开发

如何每隔5分钟获取最近1小时的数据而不进行分组?

如何每5分钟触发一次并获取最近1个小时的数据?我想出了这一点,但似乎并没有给我最后1个小时的所有记录.我的理由是: 读取流, 根据时间戳列过滤最近1小时的数据,并且 使用 forEachbatch 进行写入/打印.还有 为它添加水印,以免保留所有过去的数据. 火花.readStream.format("delta").table("xxx").withWatermark( ..