如何在 pyspark pandas_udf 中记录/打印消息? [英] How to log/print message in pyspark pandas_udf?

查看:165
本文介绍了如何在 pyspark pandas_udf 中记录/打印消息?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经测试过 loggerprint 都无法在 pandas_udf 中打印消息,无论是集群模式还是客户端模式.

I have tested that both logger and print can't print message in a pandas_udf , either in cluster mode or client mode.

测试代码:

import sys
import numpy as np
import pandas as pd

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import logging

logger = logging.getLogger('test')

spark = (SparkSession
.builder
.appName('test')
.getOrCreate())


df = spark.createDataFrame(pd.DataFrame({
    'y': np.random.randint(1, 10, (20,)),
    'ds': np.random.randint(1000, 9999, (20,)),
    'store_id' : ['a'] * 10 + ['b'] *7 + ['q']*3,
    'product_id' : ['c'] * 5 + ['d'] *12 + ['e']*3,
    })
)


@pandas_udf('y int, ds int, store_id string, product_id string', PandasUDFType.GROUPED_MAP)
def train_predict(pdf):
    print('#'*100)
    logger.info('$'*100)
    logger.error('&'*100)
    return pd.DataFrame([], columns=['y', 'ds','store_id','product_id'])


df1 = df.groupby(['store_id', 'product_id']).apply(train_predict)

另请注意:

log4jLogger = spark.sparkContext._jvm.org.apache.log4j
LOGGER = log4jLogger.LogManager.getLogger(__name__)
LOGGER.info("#"*50)

你不能在 pandas_udf 中使用它,因为这个日志超出了 spark context 对象,你不能在 udf 中引用 spark session/context.

You can't use this in pandas_udf, because this log beyond to spark context object, you can't refer to spark session/context in a udf.

我知道的唯一方法是使用 Excetion 作为我在下面写的答案.但它很棘手并且有缺点.我想知道是否有任何方法可以在 pandas_udf 中打印消息.

The only way I know is use Excetion as the answer I wrote below. But it is tricky and with drawback. I want to know if there is any way to just print message in pandas_udf.

推荐答案

目前,我在 spark 2.4 中尝试了各种方法.

Currently, I tried every way in spark 2.4 .

没有日志,很难调试有问题的pandas_udf.我知道可以在 pandas_udf 中打印错误消息的唯一可行方法是 raise Exception .因此,以这种方式进行调试确实需要花费时间,但我知道没有更好的方法.

Without log, it is hard to debug a faulty pandas_udf. The only workable way I know can print error messgage in pandas_udf is raise Exception . So it really cost time to debug in this way, but there isn't a better way I know .

@pandas_udf('y int, ds int, store_id string, product_id string', PandasUDFType.GROUPED_MAP)
def train_predict(pdf):
    print('#'*100)
    logger.info('$'*100)
    logger.error('&'*100)
    raise Exception('@'*100)  # The only way I know can print message but would break execution 
    return pd.DataFrame([], columns=['y', 'ds','store_id','product_id'])

缺点是你不能在打印消息后保持火花运行.

The drawback is you can't keep spark running after print message.

这篇关于如何在 pyspark pandas_udf 中记录/打印消息?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆