如何模拟对 pyspark sql 函数的内部调用 [英] How to mock inner call to pyspark sql function
问题描述
得到以下pyspark代码:
Got the following piece of pyspark code:
import pyspark.sql.functions as F
null_or_unknown_count = df.sample(0.01).filter(
F.col('env').isNull() | (F.col('env') == 'Unknown')
).count()
在测试代码中,数据框被模拟,所以我试图为这个调用设置 return_value,如下所示:
In test code, the data frame is mocked, so I am trying to set the return_value for this call like this:
from unittest import mock
from unittest.mock import ANY
@mock.patch('pyspark.sql.DataFrame', spec=pyspark.sql.DataFrame)
def test_null_or_unknown_validation(self, mock_df):
mock_df.sample(0.01).filter(ANY).count.return_value = 250
但这失败了:
File "/usr/local/lib/python3.7/site-packages/pyspark/sql/functions.py", line 44, in _
jc = getattr(sc._jvm.functions, name)(col._jc if isinstance(col, Column) else col)
AttributeError: 'NoneType' object has no attribute '_jvm'
还尝试了 mock_df.sample().filter().count.return_value = 250
,它给出了同样的错误.
Also tried mock_df.sample().filter().count.return_value = 250
, which gives the same error.
我如何模拟过滤器,即 F.col('env').isNull() |(F.col('env') == 'Unknown')
正确吗?
How do I mock the filter i.e. F.col('env').isNull() | (F.col('env') == 'Unknown')
correctly?
推荐答案
感谢我在工作中聪明的同事,这就是答案.我们必须模拟 pyspark.sql.functions.col
然后设置一个 return_value.
Thanks to my smart colleague at work, here is the answer. We have to mock pyspark.sql.functions.col
and then set a return_value.
@mock.patch('pyspark.sql.functions.col')
@mock.patch('pyspark.sql.DataFrame', spec=pyspark.sql.DataFrame)
def test_null_or_unknown_validation(self, mock_df, mock_functions):
mock_functions.isNull.return_value = True # (or False also works)
mock_df.sample(0.01).filter(ANY).count.return_value = 250
使用 mock_df.sample().filter().count.return_value = 250
也可以正常工作.
Using mock_df.sample().filter().count.return_value = 250
also works fine.
这篇关于如何模拟对 pyspark sql 函数的内部调用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!