错误:SparkContext只能在驱动程序上使用,而不能在工作程序上运行的代码中使用.有关更多信息,请参见SPARK-5063 [英] ERROR:SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063
问题描述
我目前正在使用ASN 1解码器.我将从生产者处获取十六进制十进制代码,并将其收集到消费者中. 然后,在我将十六进制代码转换为RDD之后,然后将十六进制值RDD传递给具有相同类Decode_Module的另一个函数,并将使用python asn1解码器对十六进制数据进行解码,然后将其返回并打印出来. 我不明白我的代码有什么问题,我也已经在工作节点中安装了asn1解析器依赖项. 我在lambda表达式或其他方式中调用的方式有任何错误.
I am presently working with ASN 1 Decoder.I will be getting a Hex decimal code from producer and i will be collecting it in consumer. Then after i will be converting the hex code to RDD and then pass the hex value RDD to another function with in same class Decode_Module and will be using python asn1 decoder to decode the hex data and return it back and print it. I don't understand whats wrong with my code.I have already installed my asn1 parser dependencies in worker nodes too. Any wrong with the way i call in lambda expression or something else.
我的错误:异常:看来您正在尝试从广播变量,操作或转换中引用SparkContext. SparkContext只能在驱动程序上使用,而不能在工作程序上运行的代码中使用.有关更多信息,请参阅SPARK-5063
My ERROR: Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063
请帮助我
我的验证码:
class telco_cn:
def __init__(self,sc):
self.sc = sc
print ('in init function')
logging.info('eneterd into init function')
def decode_module(self,msg):
try:
logging.info('Entered into generate module')
### Providing input for module we need to load
load_module(config_values['load_module'])
### Providing Value for Type of Decoding
ASN1.ASN1Obj.CODEC = config_values['PER_DECODER']
### Providing Input for Align/UnAlign
PER.VARIANT = config_values['PER_ALIGNED']
### Providing Input for pdu load
pdu = GLOBAL.TYPE[config_values['pdu_load']]
### Providing Hex value to buf
buf = '{}'.format(msg).decode('hex')
return val
except Exception as e:
logging.debug('error in decode_module function %s' %str(e))
def consumer_input(self,sc,k_topic):
logging.info('entered into consumer input');print(k_topic)
consumer = KafkaConsumer(ip and other values given)
consumer.subscribe(k_topic)
for msg in consumer:
print(msg.value);
a = sc.parallelize([msg.value])
d = a.map(lambda x: self.decode_module(x)).collect()
print d
if __name__ == "__main__":
logging.info('Entered into main')
conf = SparkConf()
conf.setAppName('telco_consumer')
conf.setMaster('yarn-client')
sc = SparkContext(conf=conf)
sqlContext = HiveContext(sc)
cn = telco_cn(sc)
cn.consumer_input(sc,config_values['kafka_topic'])
推荐答案
这是因为self.decode_module
包含SparkContext的实例.
This is because self.decode_module
contain instance of SparkContext.
要修复代码,可以使用@staticmethod
:
To fix your code you can use @staticmethod
:
class telco_cn:
def __init__(self, sc):
self.sc = sc
@staticmethod
def decode_module(msg):
return msg
def consumer_input(self, sc, k_topic):
a = sc.parallelize(list('abcd'))
d = a.map(lambda x: telco_cn.decode_module(x)).collect()
print d
if __name__ == "__main__":
conf = SparkConf()
sc = SparkContext(conf=conf)
cn = telco_cn(sc)
cn.consumer_input(sc, '')
有关更多信息:
http://spark.apache .org/docs/latest/programming-guide.html#passing-functions-to-spark
这篇关于错误:SparkContext只能在驱动程序上使用,而不能在工作程序上运行的代码中使用.有关更多信息,请参见SPARK-5063的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!