如何将 Parquet 转换为 Spark Delta Lake? [英] How to Convert Parquet to Spark Delta Lake?

查看:28
本文介绍了如何将 Parquet 转换为 Spark Delta Lake?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图将一组镶木地板文件转换为 delta 格式就地.我尝试使用 Databricks 文档中提到的 CONVERT 命令.https://docs.databricks.com/spark/latest/spark-sql/language-manual/convert-to-delta.html

I was trying to convert a set of parquet files into delta format in-place. I tried using the CONVERT command as mentioned in the Databricks documentation. https://docs.databricks.com/spark/latest/spark-sql/language-manual/convert-to-delta.html

转换为 DELTA parquet.'path/to/table'

我使用的是 Spark 2.4.4 和 PySpark(Python 版本 3.5.3).这是我正在执行的命令

I am using Spark 2.4.4 and PySpark (Python version 3.5.3). This is the command I am executing

spark.sql("转换为 DELTA 实木复合地板.'/usr/spark-2.4.4/data/delta-parquet/'")其中 '/usr/spark-2.4.4/data/delta-parquet/' 是 parquet 文件所在的路径.

spark.sql("CONVERT TO DELTA parquet. '/usr/spark-2.4.4/data/delta-parquet/'") where '/usr/spark-2.4.4/data/delta-parquet/' is the path where the parquet files are located.

但是,我遇到了一个例外.

But, I am getting an exception.

  File "/usr/spark-2.4.4/python/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/usr/spark-2.4.4/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o25.sql.
: org.apache.spark.sql.catalyst.parser.ParseException: 
mismatched input 'CONVERT' expecting {'(', 'SELECT', 'FROM', 'ADD', 'DESC', 'WITH', 'VALUES', 'CREATE', 'TABLE', 'INSERT', 'DELETE', 'DESCRIBE', 'EXPLAIN', 'SHOW', 'USE', 'DROP', 'ALTER', 'MAP', 'SET', 'RESET', 'START', 'COMMIT', 'ROLLBACK', 'REDUCE', 'REFRESH', 'CLEAR', 'CACHE', 'UNCACHE', 'DFS', 'TRUNCATE', 'ANALYZE', 'LIST', 'REVOKE', 'GRANT', 'LOCK', 'UNLOCK', 'MSCK', 'EXPORT', 'IMPORT', 'LOAD'}(line 1, pos 0)

== SQL ==
CONVERT TO DELTA parquet. '/usr/spark-2.4.4/data/delta-parquet/'
^^^

    at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:241)
    at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:117)
    at org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:48)
    at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(ParseDriver.scala:69)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:642)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)


During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/spark-2.4.4/python/pyspark/sql/session.py", line 767, in sql
    return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
  File "/usr/spark-2.4.4/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/usr/spark-2.4.4/python/pyspark/sql/utils.py", line 73, in deco
    raise ParseException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.ParseException: "\nmismatched input 'CONVERT' expecting {'(', 'SELECT', 'FROM', 'ADD', 'DESC', 'WITH', 'VALUES', 'CREATE', 'TABLE', 'INSERT', 'DELETE', 'DESCRIBE', 'EXPLAIN', 'SHOW', 'USE', 'DROP', 'ALTER', 'MAP', 'SET', 'RESET', 'START', 'COMMIT', 'ROLLBACK', 'REDUCE', 'REFRESH', 'CLEAR', 'CACHE', 'UNCACHE', 'DFS', 'TRUNCATE', 'ANALYZE', 'LIST', 'REVOKE', 'GRANT', 'LOCK', 'UNLOCK', 'MSCK', 'EXPORT', 'IMPORT', 'LOAD'}(line 1, pos 0)\n\n== SQL ==\nCONVERT TO DELTA parquet. '/usr/spark-2.4.4/data/delta-parquet/'\n^^^\n"

我是否以正确的方式使用了 CONVERT 命令?任何帮助将不胜感激.

Am I using the CONVERT command in the right way? Any help would be appreciated.

推荐答案

对于PySpark,使用最新的Delta Lake版本,可以如下转换:

For PySpark, using the latest Delta Lake version, you can convert as follows:

from delta.tables import *

deltaTable = DeltaTable.convertToDelta(spark, "parquet.`/usr/spark-2.4.4/data/delta-parquet/`")

此示例取自文档

这篇关于如何将 Parquet 转换为 Spark Delta Lake?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆