PySpark 错误:AttributeError: 'NoneType' 对象没有属性 '_jvm' [英] PySpark error: AttributeError: 'NoneType' object has no attribute '_jvm'

查看:43
本文介绍了PySpark 错误:AttributeError: 'NoneType' 对象没有属性 '_jvm'的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有格式为

的时间戳数据集

我在 pyspark 中编写了一个 udf 来处理这个数据集并作为键值的映射返回.但我收到以下错误消息.

数据集:df_ts_list

+--------------------+|ts_list|+--------------------+|[1477411200, 1477...||[1477238400, 1477...||[1477022400, 1477...||[1477224000, 1477...||[1477256400, 1477...||[1477346400, 1476...||[1476986400, 1477...||[1477321200, 1477...||[1477306800, 1477...||[1477062000, 1477...||[1477249200, 1477...||[1477040400, 1477...||[1477090800, 1477...|+--------------------+

Pyspark UDF:

<预><代码>>>>def on_time(ts_list):... 导入系统... 导入操作系统... sys.path.append('/usr/lib/python2.7/dist-packages')... os.system("sudo apt-get install python-numpy -y")... 将 numpy 导入为 np... 导入日期时间... 导入时间...从日期时间导入时间增量... ts = np.array(ts_list)...如果 ts.size == 0:...计数= 0...持续时间 = 0... st = time.mktime(datetime.now())... ymd = str(datetime.fromtimestamp(st).date())... 别的:... ts.sort()... one_tag = []...开始=浮动(ts [0])...对于我在范围内(len(ts)):...如果我 == (len(ts)) - 1:... end = float(ts[i])... a_round = [开始,结束]... one_tag.append(a_round)... 别的:... diff = (datetime.datetime.fromtimestamp(float(ts[i+1])) - datetime.datetime.fromtimestamp(float(ts[i])))...如果 abs(diff.total_seconds()) >3600:... end = float(ts[i])... a_round = [开始,结束]... one_tag.append(a_round)...开始=浮动(ts [i + 1])... one_tag = [u for u in one_tag if u[1] - u[0] >300]... count = int(len(one_tag))... 持续时间 = int(np.diff(one_tag).sum())... ymd = str(datetime.datetime.fromtimestamp(time.time()).date())...返回{'计数':计数,'持续时间':持续时间,'ymd':ymd}

Pyspark 代码:

<预><代码>>>>on_time=udf(on_time, MapType(StringType(),StringType()))>>>df_ts_list.withColumn("one_tag", on_time("ts_list")).select("one_tag").show()

错误:

<块引用>

Caused by: org.apache.spark.api.python.PythonException: Traceback(最近一次调用):文件/usr/lib/spark/python/pyspark/worker.py",第 172 行,在主目录中过程()文件/usr/lib/spark/python/pyspark/worker.py",第 167 行,正在处理中serializer.dump_stream(func(split_index, iterator), outfile)文件/usr/lib/spark/python/pyspark/worker.py",第 106 行,在 <lambda> 中.func = lambda _, it: map(mapper, it)文件/usr/lib/spark/python/pyspark/worker.py",第 92 行,在 <lambda> 中.映射器 = lambda a: udf(*a)文件/usr/lib/spark/python/pyspark/worker.py",第 70 行,在 <lambda> 中.返回 lambda *a: f(*a)文件<stdin>",第 27 行,在 on_time文件/usr/lib/spark/python/pyspark/sql/functions.py",第 39 行,在 _jc = getattr(sc._jvm.functions, name)(col._jc if isinstance(col, Column) else col)AttributeError: 'NoneType' 对象没有属性 '_jvm'

任何帮助将不胜感激!

解决方案

Mariusz 的回答并没有真正帮助我.因此,如果您喜欢我,因为它是 google 上的唯一结果,并且您不熟悉 pyspark(以及一般的 spark),那么如果您喜欢我,那么这对我有用.

就我而言,我收到该错误是因为我试图在 pyspark 环境设置之前执行 pyspark 代码.

在执行依赖于 pyspark.sql.functions 的调用之前确保 pyspark 可用并设置为我解决了这个问题.

I have timestamp dataset which is in format of

And I have written a udf in pyspark to process this dataset and return as Map of key values. But am getting below error message.

Dataset:df_ts_list

+--------------------+
|             ts_list|
+--------------------+
|[1477411200, 1477...|
|[1477238400, 1477...|
|[1477022400, 1477...|
|[1477224000, 1477...|
|[1477256400, 1477...|
|[1477346400, 1476...|
|[1476986400, 1477...|
|[1477321200, 1477...|
|[1477306800, 1477...|
|[1477062000, 1477...|
|[1477249200, 1477...|
|[1477040400, 1477...|
|[1477090800, 1477...|
+--------------------+

Pyspark UDF:

>>> def on_time(ts_list):
...     import sys
...     import os
...     sys.path.append('/usr/lib/python2.7/dist-packages')
...     os.system("sudo apt-get install python-numpy -y")
...     import numpy as np
...     import datetime
...     import time
...     from datetime import timedelta
...     ts = np.array(ts_list)
...     if ts.size == 0:
...             count = 0
...             duration = 0
...             st = time.mktime(datetime.now())
...             ymd = str(datetime.fromtimestamp(st).date())
...     else:
...             ts.sort()
...             one_tag = []
...             start = float(ts[0])
...             for i in range(len(ts)):
...                     if i == (len(ts)) - 1:
...                             end = float(ts[i])
...                             a_round = [start, end]
...                             one_tag.append(a_round)
...                     else:
...                             diff = (datetime.datetime.fromtimestamp(float(ts[i+1])) - datetime.datetime.fromtimestamp(float(ts[i])))
...                             if abs(diff.total_seconds()) > 3600:
...                                     end = float(ts[i])
...                                     a_round = [start, end]
...                                     one_tag.append(a_round)
...                                     start = float(ts[i+1])
...             one_tag = [u for u in one_tag if u[1] - u[0] > 300]
...             count = int(len(one_tag))
...             duration = int(np.diff(one_tag).sum())
...             ymd = str(datetime.datetime.fromtimestamp(time.time()).date())
...     return {'count':count,'duration':duration, 'ymd':ymd}

Pyspark code:

>>> on_time=udf(on_time, MapType(StringType(),StringType()))
>>> df_ts_list.withColumn("one_tag", on_time("ts_list")).select("one_tag").show()

Error:

Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/worker.py", line 172, in main
    process()
  File "/usr/lib/spark/python/pyspark/worker.py", line 167, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/lib/spark/python/pyspark/worker.py", line 106, in <lambda>
    func = lambda _, it: map(mapper, it)
  File "/usr/lib/spark/python/pyspark/worker.py", line 92, in <lambda>
    mapper = lambda a: udf(*a)
  File "/usr/lib/spark/python/pyspark/worker.py", line 70, in <lambda>
    return lambda *a: f(*a)
  File "<stdin>", line 27, in on_time
  File "/usr/lib/spark/python/pyspark/sql/functions.py", line 39, in _
    jc = getattr(sc._jvm.functions, name)(col._jc if isinstance(col, Column) else col)
AttributeError: 'NoneType' object has no attribute '_jvm'

Any help would be appreciated!

解决方案

Mariusz answer didn't really help me. So if you like me found this because it's the only result on google and you're new to pyspark (and spark in general), here's what worked for me.

In my case I was getting that error because I was trying to execute pyspark code before the pyspark environment had been set up.

Making sure that pyspark was available and set up before doing calls dependent on pyspark.sql.functions fixed the issue for me.

这篇关于PySpark 错误:AttributeError: 'NoneType' 对象没有属性 '_jvm'的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆