Python-将整数或字符串发送到Spark-Streaming [英] Python - Send Integer or String to Spark-Streaming
本文介绍了Python-将整数或字符串发送到Spark-Streaming的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我可以通过CSV文件发送数据.首先,将我的随机数写入CSV文件,然后发送,但是可以直接发送吗?我的套接字代码:
I can send my data through CSV file. First, write my random numbers into CSV file then send it, but is it possible to send it directly? my socket code:
import socket
host = 'localhost'
port = 8080
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((host, port))
s.listen(1)
while True:
print('\nListening for a client at',host , port)
conn, addr = s.accept()
print('\nConnected by', addr)
try:
print('\nReading file...\n')
while 1:
out = "test01"
print('Sending line', line)
conn.send(out)
except socket.error:
print ('Error Occured.\n\nClient disconnected.\n')
conn.close()
火花流代码:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
sc = SparkContext("local[2]","deneme")
ssc = StreamingContext(sc, 10)
socket_stream = ssc.socketTextStream("localhost",8080)
random_integers = socket_stream.window( 30 )
digits = random_integers.flatMap(lambda line: line.split(" ")).map(lambda digit: (digit, 1))
digit_count = digits.reduceByKey(lambda x,y:x+y)
digit_count.pprint()
ssc.start()
推荐答案
这是因为套接字阻止发送数据,并且永远不会继续.最基本的解决方案是发送一些数据并关闭连接:
This is because socket blocks sending the data and never moves on. The most basic solution is to send some amount of data and close the connection:
import socket
import time
host = 'localhost'
port = 50007
i = 0
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((host, port))
s.listen(1)
try:
while True:
conn, addr = s.accept()
try:
for j in range(10):
conn.send(bytes("{}\n".format(i), "utf-8"))
i += 1
time.sleep(1)
conn.close()
except socket.error: pass
finally:
s.close()
要获得更多有趣的信息,请检查具有超时的非阻止模式.
To get something more interesting check non-blocking mode with timeouts.
这篇关于Python-将整数或字符串发送到Spark-Streaming的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文