Spark Structured Streaming - 处理每一行 [英] Spark Structured Streaming - Processing each row

查看:57
本文介绍了Spark Structured Streaming - 处理每一行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在 Spark 2.1.1 中使用结构化流.我需要对传入的消息应用一些业务逻辑(来自 Kafka 源).

I am using structured streaming with Spark 2.1.1. I need to apply some business logic to incoming messages (from Kafka source).

本质上,我需要接收消息,获取一些关键值,在 HBase 中查找它们并在数据集上执行更多业务逻辑.最终结果是需要写出到另一个 Kafka 队列的字符串消息.

essentially, I need to pick up the message, get some key values, look them up in HBase and perform some more biz logic on the dataset. the end result is a string message that needs to be written out to another Kafka queue.

但是,由于传入消息的抽象是一个数据帧(无界表 - 结构化流),我必须通过 mapPartitions(由于 HBase 客户端未可序列化).

However, since the abstraction for incoming messages is a dataframe (unbounded table - structured streaming), I have to iterate through the dataset received during a trigger through mapPartitions (partitions due to HBase client not being serializable).

在我的流程中,我需要遍历每一行以执行相同的业务流程.

During my process, i need to iterate through each row for executing the business process for the same.

  1. 是否有更好的方法可以帮助我避免 dataFrame.mapPartitions 调用?我觉得它的顺序和迭代!!
  2. 结构化流基本上迫使我从我的业务流程中生成一个输出数据帧,而没有任何开始.我还可以使用哪些其他设计模式来实现我的最终目标?
  1. Is there a better approach possible that could help me avoid the dataFrame.mapPartitions call? I feel its sequential and iterative !!
  2. Structured streaming basically forces me to generate an output data frame out of my business process, whereas there is none to start with. What other design pattern can I use to achieve my end goal ?

您会推荐一种替代方法吗?

Would you recommend an alternative approach ?

推荐答案

当您谈论在 Spark 中使用 Dataframes 时,从广义上讲,您可以做以下三件事之一a) 生成数据框b) 转换数据框c) 消费一个数据帧

When you talk about working with Dataframes in Spark, speaking very broadly, you can do one of 3 things a) Generate a Dataframe b) Transform a data frame c) Consume a data frame

在结构化流中,流数据帧是使用 DataSource 生成的.通常,您使用公开的 sparkSession.readStream 方法创建源.此方法返回一个 DataStreamReader,它有多种方法可以从各种输入中读取数据.所有这些都返回一个数据帧.它在内部创建一个数据源.Spark 允许您实现自己的数据源,但他们建议不要这样做,因为从 2.2 开始,该接口被认为是实验性的

In structured streaming, a streaming DataFrame is generated using a DataSource. Normally you create sources using methods exposed sparkSession.readStream method. This method returns a DataStreamReader which has several methods for reading from various kinds of input. All of there return a DataFrame. Internally it creates a DataSource. Spark allows you to implement your own DataSource, but they recommend against it, because as of 2.2, the interface is considered experimental

您主要使用 map 或 reduce 或者使用 spark SQL 来转换数据框.map 有不同的风格(map、mapPartition、mapParititionWithIndex)等,基本上都是取一行返回一行.Spark 在内部执行并行化对 map 方法的调用的工作.它对数据进行分区,将其分布在集群上的执行程序上,并在执行程序中调用您的 map 方法.您无需担心并行性.它是在引擎盖下构建的.mapParitions 不是顺序的".是的,一个分区中的行是顺序执行的,但多个分区是并行执行的.您可以通过对数据帧进行分区来轻松控制并行度.您有 5 个分区,您将有 5 个并行运行的进程.你有 200 个,如果你有 200 个内核,你可以让 200 个并行运行

You transform data frames mostly using map or reduce, or using spark SQL. There are different flavors of map (map, mapPartition, mapParititionWithIndex), etc. All of them basically take a row and return a row. Internally Spark does the work of parallelizing the calls to your map method. It partitions the data, spreads it around on executors on the cluster, and calls your map method in the executor. You don't need to worry about parallelism. It's built under the hood. mapParitions is not "sequential". Yes, rows within a partition are executed sequentially, but multiple partitions are executed in parallel. You can easily control the degree of parallelism by partitioning your dataframe. You have 5 partitions, you will have 5 processes running in parallel. You have 200, you can have 200 of them running in parallel if you have 200 cores

请注意,没有什么可以阻止您使用管理转换内部状态的外部系统.但是,您的转换应该是幂等的.给定一组输入,它们应该始终生成相同的输出,并随着时间的推移让系统保持相同的状态.如果您在转型中与外部系统交谈,这可能会很困难.Structured Streaming 提供至少一次保证.这意味着同一行可能会被多次转换.因此,如果您正在执行诸如向银行帐户添加资金之类的操作,您可能会发现您已向某些帐户添加了两次相同数量的资金.

Note that there is nothing stopping you from going out to external systems that manage state inside your transformation. However, your transformations should be idempotent. Given a set of input, they should always generate the same output, and leave the system in the same state over time. This can be difficult if you are talking to external systems inside your transformation. Structured Streaming provides at least once guarantee. The means that the same row might be transformed multiple times. So, if you are doing something like adding money to a bank account, you might find that you have added the same amount of money twice to some of the accounts.

数据由接收器消耗.通常,您通过在 Dataframe 上调用 format 方法然后调用 start 来添加接收器.StructuredStreaming 有一些内置接收器(除了一个)或多或少无用.您可以创建自定义接收器,但同样不推荐使用,因为该接口是实验性的.唯一有用的接收器是您将要实现的.它被称为 ForEachSink.Spark 将使用分区中的所有行为每个接收器调用您的.您可以对行做任何您想做的事情,包括将其写入 Hbase.请注意,由于结构化流的至少一次性质,同一行可能会多次馈送到您的 ForEachSink.您应该以幂等的方式实现它.此外,如果您有多个接收器,数据将并行写入接收器.您无法控制调用接收器的顺序.一个接收器可能会从一个微批次中获取数据,而另一个接收器仍在处理前一个微批次的数据.本质上,汇最终一致,而不是立即一致.

Data is consumed by sinks. Normally, you add a sink by calling the format method on a Dataframe and then calling start. StructuredStreaming has a handful of inbuilt sinks which (except for one) are more or less useless.You can create your custom Sink but again it's not recommended because the interface is experimental. The only useful sink is what you would implement. It is called ForEachSink. Spark will call your for each sink with all the rows in your partition. You can do whatever you want with the rows, which includes writing it to Hbase. Note that because of the at least once nature of Structured Streaming, the same row might be fed to your ForEachSink multiple times. You are expected to implement it in an idempotent manner. Also, if you have multiple sinks, data is written to sinks in parallel. You cannot control in what order the sinks are called. It can happen that one sink is getting data from one micro batch while another sink is still processing data for the previous micro batch. Essentially, the Sinks are eventually consistent, not immediately consistent.

通常,构建代码的最简洁方法是避免在转换中使用外部系统.您的转换应该纯粹转换数据框中的数据.如果您需要来自 HBase 的数据,请将其放入数据帧中,将其与流数据帧连接,然后对其进行转换.这是因为当您使用外部系统时,很难扩展.您希望通过增加数据帧的分区和添加节点来扩展转换.但是,过多的节点与外部系统通信会增加外部系统的负载并导致瓶颈,将转换与数据检索分离允许您独立扩展它们.

Generally, the cleanest way to build your code is to avoid going to outside systems inside your transformations. Your transformations should purely transform data in data frames. If you want data from HBase, get it into a data frame, join it with your streaming data frame, and then transform it. This is because when you go to outside systems, it becomes difficult to scale. You want to scale up your transformations by increasing partitioning on your data frames and adding nodes. However, too many nodes talking to external systems can increase the load on the external systems and cause bottlenecks, Separating transformation from data retrieval allows you to scale them independently.

但是!!!!这里有大屁股......

BUT!!!! there are big buts here......

1) 当您谈论结构化流时,没有办法实现可以根据输入中的数据有选择地从 HBase 获取数据的 Source.您必须在 map(-like) 方法中执行此操作.因此,IMO,如果 Hbase 中的数据发生变化或者您不想将大量数据保留在内存中,那么您所拥有的一切都很好.如果您在 HBase 中的数据很小且不变,那么最好将其读入批处理数据帧,缓存它,然后将其与您的流数据帧合并.Spark 会将所有数据加载到自己的内存/磁盘存储中,并将其保存在那里.如果您的数据很小并且变化非常频繁,最好在数据帧中读取它,不要缓存它并将其与流数据帧连接.Spark 每次运行微批处理时都会从 HBase 加载数据.

1) When you talk about Structured streaming, there is no way to implement a Source that can selectively get data from your HBase based on the data in your input. You have to do this inside a map(-like) method. So, IMO, what you have is perfectly fine if the data in Hbase changes or there is a lot of data that you don't want to keep in memory. If your data in HBase is small and unchanging, then it's better to read it into a batch data frame, cache it and then join it with your streaming data frame. Spark will load all the data into its own memory/disk storage, and keep it there. If your data is small and changing very frequently, it's better to read it in a data frame, don't cache it and join it with a streaming data frame. Spark will load the data from HBase every time it runs a micro batch.

2) 无法对 2 个单独的接收器的执行进行排序.所以,如果你的需求要求你写一个数据库,然后写到Kafka,并且你想保证在数据库中提交一行之后,Kafka中的一行被写入,那么唯一的方法就是a) 在 For each Sink 中进行两次写入.b) 以类似地图的函数写入一个系统,为每个接收器写入另一个系统

2) there is no way to order the execution of 2 separate Sinks. So, if your requirement requires you to write to a database, and write to Kafka, and you want to guarantee that a row in Kafka is written after the row is committed in the database, then the only way to do that is to a) do both writes in a For each Sink. b)write to one system in a map-like function and the other in a for each sink

不幸的是,如果您有一个要求需要从流源读取数据,将其与批处理源的数据连接,转换,写入数据库,调用 API,从 API 获取结果并编写API 到 Kafka 的结果,并且这些操作必须按照确切的顺序完成,那么唯一的方法就是在转换组件中实现接收器逻辑.您必须确保在单独的映射函数中保持逻辑分离,以便以最佳方式并行化它们.

Unfortunately, if you have a requirement that requires you to read data from a streaming source, join it with data from batch source, transform it, write it to database, call an API, get the result from the API and write the result of the API to Kafka, and those operations have to be done in exact order, then the only way you can do this is by implementing sink logic in a transformation component. You have to make sure you keep the logic separate in separate map functions, so you can parallelize them in an optimal manner.

此外,没有好的方法可以知道您的应用程序何时完全处理了微批处理,尤其是当您有多个接收器时

Also, there is no good way to know when a micro-batch is completely processed by your application, especially if you have multiple sinks

这篇关于Spark Structured Streaming - 处理每一行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆