python将csv数据发送到火花流 [英] python send csv data to spark streaming

查看:77
本文介绍了python将csv数据发送到火花流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想尝试在python中加载csv数据,并通过SPark Streaming流化每一行火花.

I would like to try and load a csv data in python and stream each row spark via SPark Streaming.

我对网络事物还很陌生.我不完全确定我是否应该创建服务器python脚本,一旦该脚本建立连接(使用Spark Streaming),它将开始发送每一行.在Spark Streaming文档中,他们执行nc -l 9999,如果不正确,它是一个侦听端口9999的netcat服务器.因此,我尝试创建类似的Python脚本来解析csv并在端口60000上发送

Im pretty new to network stuff. Im not exactly if Im supposed to create a server python script that once it establishes a connection(with spark streaming) it will start sending each row. In the Spark Streaming Documentation they do a nc -l 9999 which is a netcat server listening on port 9999 if im correct. So I tried creating a python script similar that parses a csv and sends on port 60000

import socket                   # Import socket module
import csv

 port = 60000                    # Reserve a port for your service.
 s = socket.socket()             # Create a socket object
 host = socket.gethostname()     # Get local machine name
 s.bind((host, port))            # Bind to the port
 s.listen(5)                     # Now wait for client connection.

 print('Server listening....')

 while True:
     conn, addr = s.accept()     # Establish connection with client.
     print('Got connection from', addr)



     csvfile = open('Titantic.csv', 'rb')

     reader = csv.reader(csvfile, delimiter = ',')
     for row in reader:
         line = ','.join(row)

         conn.send(line)
         print(line)

     csvfile.close()

     print('Done sending')
     conn.send('Thank you for connecting')
     conn.close()

SPark流脚本-

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
ssc = StreamingContext(sc, 1)

# Create a DStream that will connect to hostname:port, like localhost:9999
lines_RDD = ssc.socketTextStream("localhost", 60000)

# Split each line into words
data_RDD = lines_RDD.flatMap(lambda line: line.split(","))

data_RDD.pprint()

ssc.start()             # Start the computation
ssc.awaitTermination()  # Wait for the computation to terminate

运行spark脚本时(这在Jupyter Notebooks中),我得到此错误- IllegalArgumentException:要求失败:未注册任何输出操作,因此无须执行"

When run the spark script(This is in Jupyter Notebooks btw) I get this error - IllegalArgumentException: 'requirement failed: No output operations registered, so nothing to execute'

我不认为我在正确执行套接字脚本,但是我不太确定该怎么做.我基本上是在尝试复制nc -lk 9999所做的事情,这样我才能通过端口发送文本数据,然后Spark流正在监听它并接收数据并对其进行处理.

I' dont think I am doing my socket script properly but im not really sure what to do Im basically trying to replicate what nc -lk 9999 does so I can send text data over the port and then spark streaming is listening to it and receives the data and processes it.

任何帮助将不胜感激

推荐答案

我正在尝试做类似的事情,但是我想每10秒流一次一行.我用这个脚本解决了:

I'm trying to do something similar, but I want to stream a row every 10 seconds. I solved with this script:

import socket
from time import sleep

host = 'localhost'
port = 12345

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((host, port))
s.listen(1)
while True:
    print('\nListening for a client at',host , port)
    conn, addr = s.accept()
    print('\nConnected by', addr)
    try:
        print('\nReading file...\n')
        with open('iris_test.csv') as f:
            for line in f:
                out = line.encode('utf-8')
                print('Sending line',line)
                conn.send(out)
                sleep(10)
            print('End Of Stream.')
    except socket.error:
        print ('Error Occured.\n\nClient disconnected.\n')
conn.close()

希望这会有所帮助.

这篇关于python将csv数据发送到火花流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆