使用RabbitMQ源的Spark结构化流式传输 [英] Spark Structured Streaming with RabbitMQ source

查看:488
本文介绍了使用RabbitMQ源的Spark结构化流式传输的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试为 Structured Streaming 编写一个自定义接收器,它将使用来自 RabbitMQ 的消息。
Spark 最近发布了 DataSource V2 API,这看起来很有前途。由于它抽象了很多细节,我想在简单性和性能方面使用这个API。但是,由于它很新,因此可用的资源不多。我需要经验丰富的 Spark 的一些澄清,因为他们会更容易掌握关键点。我们走了:

I am trying to write a custom receiver for Structured Streaming that will consume messages from RabbitMQ. Spark recently released DataSource V2 API, which seems very promising. Since it abstracts away many details, I want to use this API for the sake of both simplicity and performance. However, since it's quite new, there are not many sources available. I need some clarification from experienced Spark guys, since they will grasp the key points easier. Here we go:

我的出发点是博客文章系列,第一部分是这里。它显示了如何在没有流功能的情况下实现数据源。为了制作流媒体源,我稍微改变了它们,因为我需要实现 MicroBatchReadSupport 而不是(或除此之外) DataSourceV2

My starting point is the blog post series, with the first part here. It shows how to implement a data source, without streaming capability. To make a streaming source, I slightly changed them, since I need to implement MicroBatchReadSupport instead of (or in addition to) DataSourceV2.

为了提高效率,让多个spark执行器同时消耗 RabbitMQ ,即从同一队列中消息是明智的。如果我没有感到困惑,输入的每个分区-in Spark 的术语 - 对应于队列中的消费者-in RabbitMQ 术语。因此,我们需要为输入流设置多个分区,对吗?

To be efficient, it's wise to have multiple spark executors consuming RabbitMQ concurrently, i.e. from the same queue. If I'm not confused, every partition of the input -in Spark's terminology- corresponds to a consumer from the queue -in RabbitMQ terminology. Thus, we need to have multiple partitions for the input stream, right?

系列的第4部分,我实现了 MicroBatchReader 如下:

@Override
public List<DataReaderFactory<Row>> createDataReaderFactories() {
    int partition = options.getInt(RMQ.PARTITICN, 5);
    List<DataReaderFactory<Row>> factories = new LinkedList<>();
    for (int i = 0; i < partition; i++) {
        factories.add(new RMQDataReaderFactory(options));
    }
    return factories;
}

我正在返回工厂列表,并希望列表中的每个实例将用于创建一个读者,也将是一个消费者。这种方法是否正确?

I am returning a list of factories, and hope that every instance in the list will be used to create a reader, which will be also a consumer. Is that approach correct?

我希望我的接收器可靠,即在每次处理完消息后(或者至少写入chekpoint目录进行进一步处理),我需要确认它回到 RabbitMQ 。问题在这之后开始:这些工厂是在驱动程序上创建的,实际的读取过程是在执行程序中通过 DataReader 。但是, commit 方法是 MicroBatchReader 的一部分,而不是 DataReader 。由于每个 MicroBatchReader 我有很多 DataReader ,我应该如何将这些消息发回 RabbitMQ ?或者我应该在 DataReader 上调用rel =nofollow noreferrer> next 方法?安全吗?如果是这样, commit 函数的目的是什么呢?

I want my reciever to be reliable, i.e. after every processed message (or at least written to chekpoint directory for further processing), I need to ack it back to RabbitMQ. The problem starts after here: these factories are created at the driver, and the actual reading process takes place at executors through DataReaders. However, the commit method is a part of MicroBatchReader, not DataReader. Since I have many DataReaders per MicroBatchReader, how should I ack these messages back to RabbitMQ? Or should I ack when the next method is called on DataReader? Is it safe? If so, what is the purpose of commit function then?

澄清: OBFUSCATION:答案中提供的关于重命名某些类/函数的链接(除了那里的解释)使得所有 更加清晰 比以往更糟。引自那里

CLARIFICATION: OBFUSCATION: The link provided in the answer about the renaming of some classes/functions (in addition to the explanations there) made everything much more clear worse than ever. Quoting from there:


重命名:

Renames:


  • DataReaderFactory to InputPartition

DataReader to InputPartitionReader

...

InputPartition 的目的是管理
相关阅读器的生命周期,现在称为 InputPartitionReader ,使用
显式创建操作来镜像关闭操作。因为 DataReaderFactory 似乎比它更多
泛型,所以这不是
更清楚,并且不清楚为什么一组它们以
a读取。

InputPartition's purpose is to manage the lifecycle of the associated reader, which is now called InputPartitionReader, with an explicit create operation to mirror the close operation. This was no longer clear from the API because DataReaderFactory appeared to be more generic than it is and it isn't clear why a set of them is produced for a read.

编辑:然而, docs 清楚说阅读器工厂将被序列化并发送给执行者,然后数据阅读器将在执行器上创建并进行实际阅读。

However, the docs clearly say that "the reader factory will be serialized and sent to executors, then the data reader will be created on executors and do the actual reading."

使消费者可靠,我必须在Spark方面提交之后才对特定消息进行确认。 请注意消息必须在通过它传递的同一连接上进行确认,但是在驱动程序节点处调用commit函数。我如何在worker / executor节点提交?

To make the consumer reliable, I have to ACK for a particular message only after it is committed at Spark side. Note that the messages have to be ACKed on the same connection that it has been delivered through, but commit function is called at driver node. How can I commit at the worker/executor node?

推荐答案


>我正在返回一个工厂列表,并且希望列表中的每个实例都用于创建一个读者,这也是一个消费者。这种方法是否正确?


源[socket] [1]源实现有一个线程将消息推送到内部ListBuffer。换句话说,有一个消费者(线程)填充内部ListBuffer,然后**由planInputPartitions(`createDataReaderFactories` got [renamed] [2] toplanInputPartitions)划分为分区。


此外,根据[MicroBatchReadSupport] [3]的Javadoc,


>执行引擎将创建一个微批阅读器启动流式查询,为每个要处理的批处理调用setOffsetRange和createDataReaderFactories,然后在执行完成时调用stop()。请注意,由于重新启动或故障恢复,单个查询可能会有多次执行。


换句话说,`createDataReaderFactories`应该被调用**多次**,据我的理解暗示每个`DataReader`负责一个静态输入分区,这意味着DataReader不应该是消费者。



----------


>但是,提交方法是MicroBatchReader的一部分,不是DataReader ......如果是这样,那么commit函数的目的是什么呢?


提交函数的部分原因可能是防止MicroBatchReader的内部缓冲区变大。通过提交Offset,您可以有效地从缓冲区中删除小于Offset的元素,因为您承诺不再处理它们。您可以使用`batches.trimStart(offsetDiff)`





$在套接字源代码中看到这种情况。 b $ b我不确定如何实现一个可靠的接收器,所以我希望有一个更有经验的Spark人来找我并且抓住你的问题,因为我也很感兴趣!

希望这有帮助!


编辑

我只研究了套接字,以及 wiki-edit 来源。这些来源不是生产准备好的,这是问题所不能解决的问题。相反, kafka 来源是一个更好的起点,与上述来源不同,像作者一样寻找多个消费者。

I had only studied the socket, and wiki-edit sources. These sources are not production ready, which is something that the question was was not looking for. Instead, the kafka source is the better starting point which has, unlike the aforementioned sources, multiple consumers like the author was looking for.

但是,如果您正在寻找不可靠的来源,上面的socket和wikiedit源提供了一个不那么复杂的解决方案。

However, perhaps if you're looking for unreliable sources, the socket and wikiedit sources above provide a less complicated solution.

这篇关于使用RabbitMQ源的Spark结构化流式传输的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆