Apache Spark(结构化流):S3 Checkpoint支持 [英] Apache Spark (Structured Streaming) : S3 Checkpoint support

查看:124
本文介绍了Apache Spark(结构化流):S3 Checkpoint支持的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

从Spark结构化流媒体文档中: 此检查点位置必须是与HDFS兼容的文件系统中的路径,并且可以在开始查询时在DataStreamWriter中设置为选项."

From the spark structured streaming documentation: "This checkpoint location has to be a path in an HDFS compatible file system, and can be set as an option in the DataStreamWriter when starting a query."

当然,将检查点设置为s3路径将引发:

And sure enough, setting the checkpoint to a s3 path throws:

17/01/31 21:23:56 ERROR ApplicationMaster: User class threw exception: java.lang.IllegalArgumentException: Wrong FS: s3://xxxx/fact_checkpoints/metadata, expected: hdfs://xxxx:8020 
java.lang.IllegalArgumentException: Wrong FS: s3://xxxx/fact_checkpoints/metadata, expected: hdfs://xxxx:8020 
        at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:652) 
        at org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:194) 
        at org.apache.hadoop.hdfs.DistributedFileSystem.access$000(DistributedFileSystem.java:106) 
        at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1305) 
        at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301) 
        at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) 
        at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1301) 
        at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1430) 
        at org.apache.spark.sql.execution.streaming.StreamMetadata$.read(StreamMetadata.scala:51) 
        at org.apache.spark.sql.execution.streaming.StreamExecution.<init>(StreamExecution.scala:100) 
        at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:232) 
        at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:269) 
        at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:262) 
        at com.roku.dea.spark.streaming.FactDeviceLogsProcessor$.main(FactDeviceLogsProcessor.scala:133) 
        at com.roku.dea.spark.streaming.FactDeviceLogsProcessor.main(FactDeviceLogsProcessor.scala) 
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
        at java.lang.reflect.Method.invoke(Method.java:498) 
        at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:637) 
17/01/31 21:23:56 INFO SparkContext: Invoking stop() from shutdown hook 

这里有几个问题:

  1. 为什么不将s3作为检查点目录(常规Spark流支持此功能)?是什么使文件系统兼容HDFS"?
  2. 我暂时使用HDFS(因为群集可以一直上升或下降),并使用s3作为持久存储所有数据的地方-在这种设置中为结构化流数据存储检查点数据的建议是什么?

推荐答案

是什么使FS HDFS兼容"?它是一个文件系统,其行为在 Hadoop FS规范.在那里涵盖了对象存储库和FS之间的区别,关键点是最终一致的对象存储库没有附加或O(1)原子重命名不兼容"

What makes an FS HDFS "compliant?" it's a file system, with the behaviours specified in Hadoop FS specification. The difference between an object store and FS is covered there, with the key point being "eventually consistent object stores without append or O(1) atomic renames are not compliant"

特别是对于S3

  1. 这是不一致的:创建新的Blob后,列表命令通常不会显示它.删除内容相同.
  2. 当Blob被覆盖或删除时,可能需要一段时间才能消失
  3. rename()通过复制实现,然后删除

通过将所有内容保存到一个位置,然后将其重命名为checkpoint目录来生成流式检查点.这使得到达检查点的时间与在S3中进行数据复制的时间成正比,约为6-10 MB/s.

Spark streaming checkpoints by saving everything to a location and then renaming it to the checkpoint directory. This makes the time to checkpoint proportional to the time to do a copy of the data in S3, which is ~6-10 MB/s.

当前流代码不适合s3

The current bit of streaming code isn't suited for s3

现在,请执行以下一项操作

For now, do one of

  • 检查点到HDFS,然后复制结果
  • 检查点,以分配给您的EBS并将其附加到您的集群
  • 到S3的检查点,但检查点之间的间隔很大,因此到检查点的时间不会使您的流式传输应用程序瘫痪.

如果您使用的是EMR,则可以为由DB支持的一致的syntax数据库支付S3的溢价,从而获得更好的一致性.但是复制时间仍然相同,因此检查点将同样缓慢

If you are using EMR, you can pay the premium for a consistent, dynamo DB backed S3, which gives you better consistency. But copy time is still the same, so checkpointing will be just as slow

这篇关于Apache Spark(结构化流):S3 Checkpoint支持的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆