有没有一种方法可以在不等待所有分区完成执行的情况下将结果流式传输到驱动程序? [英] Is there a way to stream results to driver without waiting for all partitions to complete execution?

查看:43
本文介绍了有没有一种方法可以在不等待所有分区完成执行的情况下将结果流式传输到驱动程序?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

是否可以在不等待所有分区完成执行的情况下将结果流式传输到驱动程序?

我是Spark的新手,因此,如果有更好的方法,请指出正确的方向.我想并行执行大量分区,并使用spark来处理分发/重新启动等操作.完成操作后,我想将结果收集到驱动程序中的单个存档中.

使用toLocalIterator()

我已经可以通过 限制了驱动程序所需的资源.因此基本上可以正常工作.

问题在于,toLocalIterator()不仅将驱动程序一次限制为一个分区,而且似乎一次只能执行一个分区.这对我没有用.下面的演示代码中演示了该行为.

使用persist() + count() + toLocalIterator()

我发现我可以通过坚持并随后使用count()触发并行执行来解决此问题.之后,toLocalIterator()能够快速提取预先计算的结果.

问题是我有很多分区(大约10 ^ 3或10 ^ 4),每个分区的大小大约需要15分钟.这样最终会保留大量数据(不是很多),但更糟糕的是,一旦整个工作持续太长时间,它似乎就失去了持久性.分区最终被重新计算.我正在使用具有抢占性角色的google dataproc,因此可能与它有关系,但是我很确定它最终甚至在固定工作者上也要重新计算...我不确定到底发生了什么.

无论如何,在访问第一个结果之前必须执行所有分区似乎并不理想.

下面的演示代码演示了一切正常的最佳情况,并且迭代不会触发重新计算.

??? ->迭代数据而无需等待完整执行

有没有类似的东西?

复制/粘贴演示代码

import time
import pyspark.storagelevel

def slow_square(n):
    time.sleep(5)
    return n**2


with pyspark.SparkContext() as spark_context:
    numbers = spark_context.parallelize(range(4), 4)  # I think 4 is default executors locally
    squares = numbers.map(slow_square)

    # Use toLocalIterator()
    start = time.time()
    list(squares.toLocalIterator())
    print('toLocalIterator() took {:.1f} seconds (expected about 5)'.format(time.time() - start))
    # I get about 20s

    # Use count() to show that it's faster in parallel
    start = time.time()
    squares.count()
    print('count() took {:.1f} seconds (expected about 5)'.format(time.time() - start))
    # I get about 5s

    # Use persist() + count() + toLocalIterator()
    start = time.time()
    squares.persist(pyspark.storagelevel.StorageLevel.MEMORY_AND_DISK)
    squares.count()
    list(squares.toLocalIterator())
    print('persisted toLocalIterator() took {:.1f} seconds (expected about 5)'.format(time.time() - start))
    # I get about 5s

解决方案

通常来说,这不是您通常在Spark中执行的操作.通常,我们尝试将通过驱动程序传递的数据量限制为最少.造成这种情况的主要原因有两个:

  • 将数据传递给Spark驱动程序很容易成为应用程序的瓶颈.
  • 驱动程序实际上是批处理应用程序中的单点故障.

通常情况下,您只需要继续进行作业,写入持久性存储,然后最终对结果应用进一步的处理步骤即可.

如果您希望能够迭代访问结果,则可以选择以下几种方式:

  • 使用Spark Streaming.创建一个简单的过程,将数据推送到群集,然后收集每个批次.它简单,可靠,经过测试,不需要任何其他基础结构.
  • 使用foreach/foreachPartition处理数据,并在产生数据时将其推送到外部消息传递系统,并使用另一个过程进行消耗和写入.这需要额外的组件,但从概念上讲可能更容易(您可以使用反压,缓冲结果,从驱动程序分离合并逻辑以将应用程序失败的风险降至最低).
  • Hack Spark蓄能器.任务完成后,火花累积器会更新,因此您可以分批处理累积的即将到来的数据.

    警告:以下代码仅是概念验证.没有经过适当的测试,很可能是高度不可靠的.

    使用 RXPy

    的示例AccumulatorParam

    # results_param.py
    
    from rx.subjects import Subject
    from pyspark import AccumulatorParam, TaskContext
    
    class ResultsParam(AccumulatorParam, Subject):
        """An observable accumulator which collects task results"""
        def zero(self, v):
            return []
    
        def addInPlace(self, acc1, acc2):
            # This is executed on the workers so we have to
            # merge the results
            if (TaskContext.get() is not None and 
                    TaskContext().get().partitionId() is not None):
                acc1.extend(acc2)
                return acc1
            else:
                # This is executed on the driver so we discard the results
                # and publish to self instead
                for x in acc2:
                    self.on_next(x)
                return []
    

    简单的Spark应用程序(Python 3.x):

    # main.py
    
    import time
    from pyspark import SparkContext, TaskContext
    
    sc = SparkContext(master="local[4]")
    sc.addPyFile("results_param.py")
    
    from results_param import ResultsParam
    
    # Define accumulator
    acc = sc.accumulator([], ResultsParam())
    
    # Dummy subscriber 
    acc.accum_param.subscribe(print)
    
    def process(x):
        """Identity proccess"""
        result = x
        acc.add([result])
    
        # Add some delay
        time.sleep(5)
    
        return result
    
    sc.parallelize(range(32), 8).foreach(process)
    

    这相对简单,但是如果多个任务同时完成,则有可能使驱动程序不堪重负,因此您必须大大超额分配驱动程序资源(成比例地达到并行度和任务结果的预期大小).

  • 直接使用Scala runJob(不支持Python).

    Spark实际上是异步获取结果的,只要您不关心顺序,就不需要等待所有数据被处理.您可以看到例如实现Scala reduce .

    应该可以使用这种机制将分区推送到Python进程,但是我还没有尝试过.

Is there a way to stream results to the driver without waiting for all partitions to complete execution?

I am new to Spark so please point me in the right direction if there is a better approach. I would like to execute a large number of partitions in parallel and use spark to handle the distribution/restarts etc. As the operations complete, I would like to collect the results into a single archive in the driver.

Use toLocalIterator()

I have been able to do this with toLocalIterator() which according to the docs limits the resources required by the driver. So it basically works.

The problem is that toLocalIterator() not only limits the driver to one partition at a time, but also seems to execute the partitions one at at time. This is not useful for me. The behavior is demonstrated in the demo code below.

Use persist() + count() + toLocalIterator()

I found that I could somewhat get around this by persisting and then triggering parallel execution with a count(). After that, toLocalIterator() is able to pull pre-calculated results quickly.

The problem with this is that I have a large number of partitions (on the order of 10^3 or 10^4) that are sized to take about 15 minutes each. This ends up persisting a lot of data (not a big deal) but much worse, it seems to lose the persistence once the overall job goes on for too long. The partitions end up being recalculated. I'm using google dataproc with preemptible workers so that might have had something to do with it but I'm pretty sure it ended up recalculating even on the fixed workers... I'm not sure exactly what happened.

In any case, it's doesn't seem ideal to have to execute all partitions before having access to the first result.

The demo code below demonstrates the best case when everything persists well and iteration does not trigger recalculation.

??? --> iterate on the data without waiting for full execution

Is there anything like that?

Copy/Paste demo code

import time
import pyspark.storagelevel

def slow_square(n):
    time.sleep(5)
    return n**2


with pyspark.SparkContext() as spark_context:
    numbers = spark_context.parallelize(range(4), 4)  # I think 4 is default executors locally
    squares = numbers.map(slow_square)

    # Use toLocalIterator()
    start = time.time()
    list(squares.toLocalIterator())
    print('toLocalIterator() took {:.1f} seconds (expected about 5)'.format(time.time() - start))
    # I get about 20s

    # Use count() to show that it's faster in parallel
    start = time.time()
    squares.count()
    print('count() took {:.1f} seconds (expected about 5)'.format(time.time() - start))
    # I get about 5s

    # Use persist() + count() + toLocalIterator()
    start = time.time()
    squares.persist(pyspark.storagelevel.StorageLevel.MEMORY_AND_DISK)
    squares.count()
    list(squares.toLocalIterator())
    print('persisted toLocalIterator() took {:.1f} seconds (expected about 5)'.format(time.time() - start))
    # I get about 5s

解决方案

Generally speaking this is not something you would normally do in Spark. Typically we try to limit amount of data which is passed through the driver to the minimum. There two main reasons for that:

  • Passing data to the Spark driver can easily become a bottleneck in your application.
  • Driver is effectively a single point of failure in batch applications.

In normal case you'd just let the job go on, write to the persistent storage and eventually apply further processing steps on the results.

If you want to be able to access the results iteratively you have a few options:

  • Use Spark Streaming. Create a simple process which pushes data to the cluster and then collect each batch. It is simple, reliable, tested, and doesn't require any additional infrastructure.
  • Process data using foreach / foreachPartition and push data to the external messaging system as it is produced and use another process to consume and write. This requires additional component but can be easier conceptually (you can use back pressure, buffer the results, separate merging logic from the driver to minimize the risk of the application failure).
  • Hack Spark accumulators. Spark accumulators are updated when task has been finished so you process accumulated upcoming data in discrete batches.

    Warning: Following code is just a proof-of-concept. It hasn't been properly tested and most likely is highly unreliable.

    Example AccumulatorParam using RXPy

    # results_param.py
    
    from rx.subjects import Subject
    from pyspark import AccumulatorParam, TaskContext
    
    class ResultsParam(AccumulatorParam, Subject):
        """An observable accumulator which collects task results"""
        def zero(self, v):
            return []
    
        def addInPlace(self, acc1, acc2):
            # This is executed on the workers so we have to
            # merge the results
            if (TaskContext.get() is not None and 
                    TaskContext().get().partitionId() is not None):
                acc1.extend(acc2)
                return acc1
            else:
                # This is executed on the driver so we discard the results
                # and publish to self instead
                for x in acc2:
                    self.on_next(x)
                return []
    

    Simple Spark application (Python 3.x):

    # main.py
    
    import time
    from pyspark import SparkContext, TaskContext
    
    sc = SparkContext(master="local[4]")
    sc.addPyFile("results_param.py")
    
    from results_param import ResultsParam
    
    # Define accumulator
    acc = sc.accumulator([], ResultsParam())
    
    # Dummy subscriber 
    acc.accum_param.subscribe(print)
    
    def process(x):
        """Identity proccess"""
        result = x
        acc.add([result])
    
        # Add some delay
        time.sleep(5)
    
        return result
    
    sc.parallelize(range(32), 8).foreach(process)
    

    This is relatively simple but there is a risk of overwhelming the driver if multiple tasks finish at the same time so you have to significantly oversubscribe driver resources (proportionally to the parallelism level and an expected size of the task result).

  • Use Scala runJob directly (not Python friendly).

    Spark actually fetches the results asynchronously and it is not required to wait for all the data to be processed, as long as you don't care about the order. You can see for example the implementation Scala reduce.

    It should be possible to use this mechanism to push partitions to the Python process as they come, but I haven't tried it yet.

这篇关于有没有一种方法可以在不等待所有分区完成执行的情况下将结果流式传输到驱动程序?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆