使用pika的python中的SparkStreaming,RabbitMQ和MQTT [英] SparkStreaming, RabbitMQ and MQTT in python using pika
问题描述
为了使事情变得棘手,我想使用RabbitMQ队列中的消息.现在我知道在Rabbit上有一个MQTT插件( https://www.rabbitmq.com/mqtt.html ) .
Just to make things tricky, I'd like to consume messages from the rabbitMQ queue. Now I know there is a plugin for MQTT on rabbit (https://www.rabbitmq.com/mqtt.html).
但是,我似乎无法使Spark使用从pika产生的消息的示例工作.
However I cannot seem to make an example work where Spark consumes a message that has been produced from pika.
例如,我在这里使用简单的wordcount.py程序( https://spark.apache.org/docs/1.2.0/streaming-programming-guide.html )以查看是否可以通过以下方式看到消息 producer :
For example I am using the simple wordcount.py program here (https://spark.apache.org/docs/1.2.0/streaming-programming-guide.html) to see if I can I see a message producer in the following way:
import sys
import pika
import json
import future
import pprofile
def sendJson(json):
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='analytics', durable=True)
channel.queue_bind(exchange='analytics_exchange',
queue='analytics')
channel.basic_publish(exchange='analytics_exchange', routing_key='analytics',body=json)
connection.close()
if __name__ == "__main__":
with open(sys.argv[1],'r') as json_file:
sendJson(json_file.read())
消费者的火花如下:
import sys
import operator
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.mqtt import MQTTUtils
sc = SparkContext(appName="SS")
sc.setLogLevel("ERROR")
ssc = StreamingContext(sc, 1)
ssc.checkpoint("checkpoint")
#ssc.setLogLevel("ERROR")
#RabbitMQ
"""EXCHANGE = 'analytics_exchange'
EXCHANGE_TYPE = 'direct'
QUEUE = 'analytics'
ROUTING_KEY = 'analytics'
RESPONSE_ROUTING_KEY = 'analytics-response'
"""
brokerUrl = "localhost:5672" # "tcp://iot.eclipse.org:1883"
topic = "analytics"
mqttStream = MQTTUtils.createStream(ssc, brokerUrl, topic)
#dummy functions - nothing interesting...
words = mqttStream.flatMap(lambda line: line.split(" "))
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)
wordCounts.pprint()
ssc.start()
ssc.awaitTermination()
但是,与简单的单词计数示例不同,我无法使它正常工作并出现以下错误:
However unlike the simple wordcount example, I cannot get this to work and get the following error:
16/06/16 17:41:35 ERROR Executor: Exception in task 0.0 in stage 7.0 (TID 8)
java.lang.NullPointerException
at org.eclipse.paho.client.mqttv3.MqttConnectOptions.validateURI(MqttConnectOptions.java:457)
at org.eclipse.paho.client.mqttv3.MqttAsyncClient.<init>(MqttAsyncClient.java:273)
所以我的问题是,根据MQTTUtils.createStream(ssc, brokerUrl, topic)
设置要监听的队列是什么?是否还有更完整的示例以及这些示例如何映射到RabbitMQ上?
So my questions are, what should be the settings in terms of MQTTUtils.createStream(ssc, brokerUrl, topic)
to listen into the queue and whether there are any more fuller examples and how these map onto those of rabbitMQ.
我正在使用以下用户代码运行消费者代码:./bin/spark-submit ../../bb/code/skunkworks/sparkMQTTRabbit.py
I am running my consumer code with: ./bin/spark-submit ../../bb/code/skunkworks/sparkMQTTRabbit.py
我已经用一个注释建议的TCP参数更新了生产者代码,如下所示:
I have updated the producer code as follows with TCP parameters as suggested by one comment:
url_location = 'tcp://localhost'
url = os.environ.get('', url_location)
params = pika.URLParameters(url)
connection = pika.BlockingConnection(params)
,火花流为:
brokerUrl = "tcp://127.0.0.1:5672"
topic = "#" #all messages
mqttStream = MQTTUtils.createStream(ssc, brokerUrl, topic)
records = mqttStream.flatMap(lambda line: json.loads(line))
count = records.map(lambda rec: len(rec))
total = count.reduce(lambda a, b: a + b)
total.pprint()
推荐答案
您似乎使用了错误的端口号.假设:
It looks like you are using wrong port number. Assuming that:
- 您有一个使用默认设置运行的RabbitMQ本地实例,并且已启用MQTT插件(
rabbitmq-plugins enable rabbitmq_mqtt
)并重新启动RabbitMQ服务器
执行 - 包含的
spark-streaming-mqtt
spark-submit
/pyspark
(使用packages
或jars
/driver-class-path
时)- you have a local instance of RabbitMQ running with default settings and you've enabled MQTT plugin (
rabbitmq-plugins enable rabbitmq_mqtt
) and restarted RabbitMQ server - included
spark-streaming-mqtt
when executingspark-submit
/pyspark
(either withpackages
orjars
/driver-class-path
)
您可以将TCP与tcp://localhost:1883
一起使用.您还必须记住,MQTT使用的是amq.topic
.
you can connect using TCP with tcp://localhost:1883
. You have to also remember that MQTT is using amq.topic
.
快速入门:
-
创建具有以下内容的
Dockerfile
:
FROM rabbitmq:3-management
RUN rabbitmq-plugins enable rabbitmq_mqtt
构建Docker映像:
build Docker image:
docker build -t rabbit_mqtt .
启动映像并等待服务器准备就绪:
start image and wait until server is ready:
docker run -p 15672:15672 -p 5672:5672 -p 1883:1883 rabbit_mqtt
创建具有以下内容的producer.py
:
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='amq.topic',
type='topic', durable=True)
for i in range(1000):
channel.basic_publish(
exchange='amq.topic', # amq.topic as exchange
routing_key='hello', # Routing key used by producer
body='Hello World {0}'.format(i)
)
time.sleep(3)
connection.close()
开始制作人
start producer
python producer.py
并访问管理控制台 http://127.0.0.1:15672/# /exchanges/%2F/amq.topic
查看是否收到消息.
创建具有以下内容的consumer.py
:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.mqtt import MQTTUtils
sc = SparkContext()
ssc = StreamingContext(sc, 10)
mqttStream = MQTTUtils.createStream(
ssc,
"tcp://localhost:1883", # Note both port number and protocol
"hello" # The same routing key as used by producer
)
mqttStream.count().pprint()
ssc.start()
ssc.awaitTermination()
ssc.stop()
下载依赖项(将Scala版本调整为用于构建Spark和Spark版本的版本):
download dependencies (adjust Scala version to the one used to build Spark and Spark version):
mvn dependency:get -Dartifact=org.apache.spark:spark-streaming-mqtt_2.11:1.6.1
确保SPARK_HOME
和PYTHONPATH
指向正确的目录.
make sure SPARK_HOME
and PYTHONPATH
point to the correct directories.
提交consumer.py
并(与以前一样调整版本):
submit consumer.py
with (adjust versions as before):
spark-submit --packages org.apache.spark:spark-streaming-mqtt_2.11:1.6.1 consumer.py
如果执行了所有步骤,则应该在Spark日志中看到Hello world消息.
If you followed all the steps you should see Hello world messages in the Spark log.
这篇关于使用pika的python中的SparkStreaming,RabbitMQ和MQTT的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!