使用pika的python中的SparkStreaming,RabbitMQ和MQTT [英] SparkStreaming, RabbitMQ and MQTT in python using pika

查看:125
本文介绍了使用pika的python中的SparkStreaming,RabbitMQ和MQTT的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

为了使事情变得棘手,我想使用RabbitMQ队列中的消息.现在我知道在Rabbit上有一个MQTT插件( https://www.rabbitmq.com/mqtt.html ) .

Just to make things tricky, I'd like to consume messages from the rabbitMQ queue. Now I know there is a plugin for MQTT on rabbit (https://www.rabbitmq.com/mqtt.html).

但是,我似乎无法使Spark使用从pika产生的消息的示例工作.

However I cannot seem to make an example work where Spark consumes a message that has been produced from pika.

例如,我在这里使用简单的wordcount.py程序( https://spark.apache.org/docs/1.2.0/streaming-programming-guide.html )以查看是否可以通过以下方式看到消息 producer :

For example I am using the simple wordcount.py program here (https://spark.apache.org/docs/1.2.0/streaming-programming-guide.html) to see if I can I see a message producer in the following way:

import sys
import pika
import json
import future
import pprofile

def sendJson(json):

  connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
  channel = connection.channel()

  channel.queue_declare(queue='analytics', durable=True)
  channel.queue_bind(exchange='analytics_exchange',
                       queue='analytics')

  channel.basic_publish(exchange='analytics_exchange', routing_key='analytics',body=json)
  connection.close()

if __name__ == "__main__":
  with open(sys.argv[1],'r') as json_file:
    sendJson(json_file.read())

消费者的火花如下:

import sys
import operator

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.mqtt import MQTTUtils

sc = SparkContext(appName="SS")
sc.setLogLevel("ERROR")
ssc = StreamingContext(sc, 1)
ssc.checkpoint("checkpoint")
#ssc.setLogLevel("ERROR")


#RabbitMQ

"""EXCHANGE = 'analytics_exchange'
EXCHANGE_TYPE = 'direct'
QUEUE = 'analytics'
ROUTING_KEY = 'analytics'
RESPONSE_ROUTING_KEY = 'analytics-response'
"""


brokerUrl = "localhost:5672" # "tcp://iot.eclipse.org:1883"
topic = "analytics"

mqttStream = MQTTUtils.createStream(ssc, brokerUrl, topic)
#dummy functions - nothing interesting...
words = mqttStream.flatMap(lambda line: line.split(" "))
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)

wordCounts.pprint()
ssc.start()
ssc.awaitTermination()

但是,与简单的单词计数示例不同,我无法使它正常工作并出现以下错误:

However unlike the simple wordcount example, I cannot get this to work and get the following error:

16/06/16 17:41:35 ERROR Executor: Exception in task 0.0 in stage 7.0 (TID 8)
java.lang.NullPointerException
    at org.eclipse.paho.client.mqttv3.MqttConnectOptions.validateURI(MqttConnectOptions.java:457)
    at org.eclipse.paho.client.mqttv3.MqttAsyncClient.<init>(MqttAsyncClient.java:273)

所以我的问题是,根据MQTTUtils.createStream(ssc, brokerUrl, topic)设置要监听的队列是什么?是否还有更完整的示例以及这些示例如何映射到RabbitMQ上?

So my questions are, what should be the settings in terms of MQTTUtils.createStream(ssc, brokerUrl, topic) to listen into the queue and whether there are any more fuller examples and how these map onto those of rabbitMQ.

我正在使用以下用户代码运行消费者代码:./bin/spark-submit ../../bb/code/skunkworks/sparkMQTTRabbit.py

I am running my consumer code with: ./bin/spark-submit ../../bb/code/skunkworks/sparkMQTTRabbit.py

我已经用一个注释建议的TCP参数更新了生产者代码,如下所示:

I have updated the producer code as follows with TCP parameters as suggested by one comment:

url_location = 'tcp://localhost'
url = os.environ.get('', url_location)
params = pika.URLParameters(url)
connection = pika.BlockingConnection(params)

,火花流为:

brokerUrl = "tcp://127.0.0.1:5672"
topic = "#" #all messages

mqttStream = MQTTUtils.createStream(ssc, brokerUrl, topic)
records = mqttStream.flatMap(lambda line: json.loads(line))
count = records.map(lambda rec: len(rec))
total = count.reduce(lambda a, b: a + b)
total.pprint()

推荐答案

您似乎使用了错误的端口号.假设:

It looks like you are using wrong port number. Assuming that:

  • 您有一个使用默认设置运行的RabbitMQ本地实例,并且已启用MQTT插件(rabbitmq-plugins enable rabbitmq_mqtt)并重新启动RabbitMQ服务器
  • 执行spark-submit/pyspark(使用packagesjars/driver-class-path时)
  • 包含的spark-streaming-mqtt
  • you have a local instance of RabbitMQ running with default settings and you've enabled MQTT plugin (rabbitmq-plugins enable rabbitmq_mqtt) and restarted RabbitMQ server
  • included spark-streaming-mqtt when executing spark-submit / pyspark (either with packages or jars / driver-class-path)

您可以将TCP与tcp://localhost:1883一起使用.您还必须记住,MQTT使用的是amq.topic.

you can connect using TCP with tcp://localhost:1883. You have to also remember that MQTT is using amq.topic.

快速入门:

  • 创建具有以下内容的Dockerfile:

FROM rabbitmq:3-management

RUN rabbitmq-plugins enable rabbitmq_mqtt

  • 构建Docker映像:

  • build Docker image:

    docker build -t rabbit_mqtt .
    

  • 启动映像并等待服务器准备就绪:

  • start image and wait until server is ready:

    docker run -p 15672:15672 -p 5672:5672 -p 1883:1883 rabbit_mqtt 
    

  • 创建具有以下内容的producer.py:

    import pika
    import time 
    
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
    channel = connection.channel()
    channel.exchange_declare(exchange='amq.topic',
                     type='topic', durable=True)
    
    for i in range(1000):
        channel.basic_publish(
            exchange='amq.topic',  # amq.topic as exchange
            routing_key='hello',   # Routing key used by producer
            body='Hello World {0}'.format(i)
        )
        time.sleep(3)
    
    connection.close()
    

  • 开始制作人

  • start producer

    python producer.py
    

    并访问管理控制台 http://127.0.0.1:15672/# /exchanges/%2F/amq.topic

    查看是否收到消息.

    创建具有以下内容的consumer.py:

    from pyspark import SparkContext
    from pyspark.streaming import StreamingContext
    from pyspark.streaming.mqtt import MQTTUtils
    
    sc = SparkContext()
    ssc = StreamingContext(sc, 10)
    
    mqttStream = MQTTUtils.createStream(
        ssc, 
        "tcp://localhost:1883",  # Note both port number and protocol
        "hello"                  # The same routing key as used by producer
    )
    mqttStream.count().pprint()
    ssc.start()
    ssc.awaitTermination()
    ssc.stop()
    

  • 下载依赖项(将Scala版本调整为用于构建Spark和Spark版本的版本):

  • download dependencies (adjust Scala version to the one used to build Spark and Spark version):

    mvn dependency:get -Dartifact=org.apache.spark:spark-streaming-mqtt_2.11:1.6.1
    

  • 确保SPARK_HOMEPYTHONPATH指向正确的目录.

  • make sure SPARK_HOME and PYTHONPATH point to the correct directories.

    提交consumer.py并(与以前一样调整版本):

    submit consumer.py with (adjust versions as before):

    spark-submit --packages org.apache.spark:spark-streaming-mqtt_2.11:1.6.1 consumer.py
    

  • 如果执行了所有步骤,则应该在Spark日志中看到Hello world消息.

    If you followed all the steps you should see Hello world messages in the Spark log.

    这篇关于使用pika的python中的SparkStreaming,RabbitMQ和MQTT的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

    查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆