Spark结构化流-处理每一行 [英] Spark Structured Streaming - Processing each row

查看:78
本文介绍了Spark结构化流-处理每一行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在Spark 2.1.1中使用结构化流.我需要对传入消息(从Kafka来源)应用一些业务逻辑.

I am using structured streaming with Spark 2.1.1. I need to apply some business logic to incoming messages (from Kafka source).

基本上,我需要提取消息,获取一些键值,在HBase中查找它们,并在数据集上执行更多biz逻辑.最终结果是一个字符串消息,需要将其写出到另一个Kafka队列.

essentially, I need to pick up the message, get some key values, look them up in HBase and perform some more biz logic on the dataset. the end result is a string message that needs to be written out to another Kafka queue.

但是,由于传入消息的抽象是一个数据帧(无边界表-结构化流),因此我必须通过mapPartitions遍历触发器在触发期间收到的数据集(由于HBase客户端无法序列化而导致的分区).

However, since the abstraction for incoming messages is a dataframe (unbounded table - structured streaming), I have to iterate through the dataset received during a trigger through mapPartitions (partitions due to HBase client not being serializable).

在我的流程中,我需要遍历每一行以执行相同的业务流程.

During my process, i need to iterate through each row for executing the business process for the same.

  1. 是否有更好的方法可以帮助我避免调用dataFrame.mapPartitions?我觉得它是连续的和反复的!!
  2. 结构化的流基本上迫使我从业务流程中生成输出数据帧,而没有一开始.我可以使用其他哪些设计模式来实现最终目标?
  1. Is there a better approach possible that could help me avoid the dataFrame.mapPartitions call? I feel its sequential and iterative !!
  2. Structured streaming basically forces me to generate an output data frame out of my business process, whereas there is none to start with. What other design pattern can I use to achieve my end goal ?

您会推荐一种替代方法吗?

Would you recommend an alternative approach ?

推荐答案

当谈到在Spark中使用Dataframe时,从广义上讲,您可以做以下三件事之一 a)生成一个数据框 b)转换数据帧 c)消耗数据帧

When you talk about working with Dataframes in Spark, speaking very broadly, you can do one of 3 things a) Generate a Dataframe b) Transform a data frame c) Consume a data frame

在结构化流中,使用 DataSource 生成流DataFrame.通常,您使用公开的 sparkSession.readStream 方法创建源.此方法返回一个DataStreamReader,它具有用于从各种输入中读取的几种方法.所有这些都返回一个DataFrame.在内部,它创建一个数据源. Spark允许您实现自己的DataSource,但他们建议您反对它,因为从2.2版本开始,该接口被认为是实验性的

In structured streaming, a streaming DataFrame is generated using a DataSource. Normally you create sources using methods exposed sparkSession.readStream method. This method returns a DataStreamReader which has several methods for reading from various kinds of input. All of there return a DataFrame. Internally it creates a DataSource. Spark allows you to implement your own DataSource, but they recommend against it, because as of 2.2, the interface is considered experimental

主要使用map或reduce或使用spark SQL转换数据框.地图有不同的样式(map,mapPartition,mapParititionWithIndex),等等.它们基本上都占用一行并返回一行.在内部,Spark会进行并行化对Map方法的调用的工作.它对数据进行分区,将其散布在群集上的执行器上,然后在执行器中调用map方法.您无需担心并行性.它内置在引擎盖下. mapParitions不是顺序的".是的,一个分区中的行是顺序执行的,但是多个分区是并行执行的.您可以通过对数据帧进行分区来轻松控制并行度.您有5个分区,您将有5个并行运行的进程.您有200个,如果有200个核心,则可以并行运行200个

You transform data frames mostly using map or reduce, or using spark SQL. There are different flavors of map (map, mapPartition, mapParititionWithIndex), etc. All of them basically take a row and return a row. Internally Spark does the work of parallelizing the calls to your map method. It partitions the data, spreads it around on executors on the cluster, and calls your map method in the executor. You don't need to worry about parallelism. It's built under the hood. mapParitions is not "sequential". Yes, rows within a partition are executed sequentially, but multiple partitions are executed in parallel. You can easily control the degree of parallelism by partitioning your dataframe. You have 5 partitions, you will have 5 processes running in parallel. You have 200, you can have 200 of them running in parallel if you have 200 cores

请注意,没有什么可以阻止您进入管理转换内部状态的外部系统. 但是,您的变换应该是幂等的.给定一组输入,它们应始终生成相同的输出,并随着时间的推移使系统保持相同的状态.如果要与转换中的外部系统对话,则可能会很困难.结构化流至少提供一次保证.意味着同一行可能会多次转换.因此,如果您正在执行向银行帐户添加资金之类的操作,则可能会发现您向某些帐户添加了相同金额的资金两次.

Note that there is nothing stopping you from going out to external systems that manage state inside your transformation. However, your transformations should be idempotent. Given a set of input, they should always generate the same output, and leave the system in the same state over time. This can be difficult if you are talking to external systems inside your transformation. Structured Streaming provides at least once guarantee. The means that the same row might be transformed multiple times. So, if you are doing something like adding money to a bank account, you might find that you have added the same amount of money twice to some of the accounts.

数据被接收器消耗.通常,您可以通过在Dataframe上调用format方法,然后调用start来添加接收器. StructuredStreaming有一些内置的接收器(除了一个)几乎没有用.您可以创建自定义接收器,但由于界面是实验性的,因此不建议再次使用.唯一有用的接收器是您将实现的对象.它称为ForEachSink. Spark会针对分区中的所有行,为每个接收器调用 .您可以对行进行任何操作,包括将其写入Hbase.请注意,由于结构化流的至少一次性质,同一行可能多次被馈送到您的ForEachSink.您应该以幂等的方式实现它.另外,如果您有多个接收器,则数据将并行写入接收器.您无法控制接收器的调用顺序.可能会发生一个接收器正在从一个微型批处理中获取数据,而另一个接收器仍在处理前一个微型批处理的数据的情况.从本质上讲,接收器最终是一致的,而不是立即一致的.

Data is consumed by sinks. Normally, you add a sink by calling the format method on a Dataframe and then calling start. StructuredStreaming has a handful of inbuilt sinks which (except for one) are more or less useless.You can create your custom Sink but again it's not recommended because the interface is experimental. The only useful sink is what you would implement. It is called ForEachSink. Spark will call your for each sink with all the rows in your partition. You can do whatever you want with the rows, which includes writing it to Hbase. Note that because of the at least once nature of Structured Streaming, the same row might be fed to your ForEachSink multiple times. You are expected to implement it in an idempotent manner. Also, if you have multiple sinks, data is written to sinks in parallel. You cannot control in what order the sinks are called. It can happen that one sink is getting data from one micro batch while another sink is still processing data for the previous micro batch. Essentially, the Sinks are eventually consistent, not immediately consistent.

通常,构建代码的最干净的方法是避免进入转换内部的外部系统.您的转换应纯粹转换数据帧中的数据.如果要从HBase获得数据,请将其放入数据框,将其与流数据框合并,然后对其进行转换.这是因为当您使用外部系统时,很难进行扩展.您想通过增加数据帧上的分区并添加节点来扩展转换.但是,与外部系统通信的节点过多会增加外部系统的负载并造成瓶颈,将转换与数据检索分开可以使您独立地扩展它们.

Generally, the cleanest way to build your code is to avoid going to outside systems inside your transformations. Your transformations should purely transform data in data frames. If you want data from HBase, get it into a data frame, join it with your streaming data frame, and then transform it. This is because when you go to outside systems, it becomes difficult to scale. You want to scale up your transformations by increasing partitioning on your data frames and adding nodes. However, too many nodes talking to external systems can increase the load on the external systems and cause bottlenecks, Separating transformation from data retrieval allows you to scale them independently.

但是!!!这里有大屁股……

BUT!!!! there are big buts here......

1)在谈论结构化流传输时,没有办法实现可根据输入中的数据有选择地从HBase中获取数据的Source.您必须在类似于map(-)的方法中执行此操作.因此,IMO,如果Hbase中的数据发生更改或者有很多数据您不想保留在内存中,那么您拥有的一切就很好了.如果您在HBase中的数据很小且没有变化,那么最好将其读取到批处理数据帧中,对其进行缓存,然后将其与流数据帧结合起来. Spark会将所有数据加载到其自己的内存/磁盘存储中,并保存在那里.如果您的数据很小且经常更改,则最好将其读取到数据帧中,不要对其进行缓存,然后将其与流数据帧结合在一起.每次运行微型批处理时,Spark都会从HBase加载数据.

1) When you talk about Structured streaming, there is no way to implement a Source that can selectively get data from your HBase based on the data in your input. You have to do this inside a map(-like) method. So, IMO, what you have is perfectly fine if the data in Hbase changes or there is a lot of data that you don't want to keep in memory. If your data in HBase is small and unchanging, then it's better to read it into a batch data frame, cache it and then join it with your streaming data frame. Spark will load all the data into its own memory/disk storage, and keep it there. If your data is small and changing very frequently, it's better to read it in a data frame, don't cache it and join it with a streaming data frame. Spark will load the data from HBase every time it runs a micro batch.

2)无法命令执行两个单独的接收器.因此,如果您的要求要求您写入数据库并写入Kafka,并且您想保证在数据库中提交该行之后,就在Kafka中写入一行,那么唯一的方法是 a)都在For每个接收器中写两次. b)使用类似于地图的函数写入一个系统,然后针对每个接收器写入另一个系统

2) there is no way to order the execution of 2 separate Sinks. So, if your requirement requires you to write to a database, and write to Kafka, and you want to guarantee that a row in Kafka is written after the row is committed in the database, then the only way to do that is to a) do both writes in a For each Sink. b)write to one system in a map-like function and the other in a for each sink

不幸的是,如果您有一项要求,要求您从流式源中读取数据,将其与批处理源中的数据合并,对其进行转换,将其写入数据库,调用API,从API获取结果并编写卡夫卡(Kafka)API的结果,并且这些操作必须按照正确的顺序进行,那么唯一的方法是在转换组件中实现接收器逻辑.您必须确保在单独的映射函数中将逻辑分开,以便可以最佳方式并行化它们.

Unfortunately, if you have a requirement that requires you to read data from a streaming source, join it with data from batch source, transform it, write it to database, call an API, get the result from the API and write the result of the API to Kafka, and those operations have to be done in exact order, then the only way you can do this is by implementing sink logic in a transformation component. You have to make sure you keep the logic separate in separate map functions, so you can parallelize them in an optimal manner.

此外,也没有很好的方法知道应用程序何时完全处理了微批处理,尤其是当您有多个接收器时

Also, there is no good way to know when a micro-batch is completely processed by your application, especially if you have multiple sinks

这篇关于Spark结构化流-处理每一行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆