如何对PySpark程序进行单元测试? [英] How do I unit test PySpark programs?

查看:227
本文介绍了如何对PySpark程序进行单元测试?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我当前的Java/Spark单元测试方法(在此处)有效,方法是使用本地"实例化SparkContext并运行使用JUnit进行单元测试.

My current Java/Spark Unit Test approach works (detailed here) by instantiating a SparkContext using "local" and running unit tests using JUnit.

必须组织代码以在一个功能中进行I/O,然后使用多个RDD调用另一个功能.

The code has to be organized to do I/O in one function and then call another with multiple RDDs.

这很好.我有一个用Java + Spark编写的经过高度测试的数据转换.

This works great. I have a highly tested data transformation written in Java + Spark.

我可以用Python做同样的事情吗?

Can I do the same with Python?

如何使用Python运行Spark单元测试?

How would I run Spark unit tests with Python?

推荐答案

我建议也使用py.test. py.test使得创建可重复使用的SparkContext测试装置变得容易,并使用它编写简洁的测试功能.您还可以专门测试夹具(例如,创建一个StreamingContext)并在测试中使用其中的一个或多个.

I'd recommend using py.test as well. py.test makes it easy to create re-usable SparkContext test fixtures and use it to write concise test functions. You can also specialize fixtures (to create a StreamingContext for example) and use one or more of them in your tests.

我写了一篇有关中型的博客文章:

I wrote a blog post on Medium on this topic:

https://engblog.nextdoor.com/unit-testing-apache -spark-with-py-test-3b8970dc013b

这是帖子中的摘录:

pytestmark = pytest.mark.usefixtures("spark_context")
def test_do_word_counts(spark_context):
    """ test word couting
    Args:
       spark_context: test fixture SparkContext
    """
    test_input = [
        ' hello spark ',
        ' hello again spark spark'
    ]

    input_rdd = spark_context.parallelize(test_input, 1)
    results = wordcount.do_word_counts(input_rdd)

    expected_results = {'hello':2, 'spark':3, 'again':1}  
    assert results == expected_results

这篇关于如何对PySpark程序进行单元测试?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆