Spark:读取inputStream而不是File [英] Spark: Read an inputStream instead of File

查看:410
本文介绍了Spark:读取inputStream而不是File的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在Java应用程序中使用SparkSQL,使用Databricks对CSV文件进行一些处理.

I'm using SparkSQL in a Java application to do some processing on CSV files using Databricks for parsing.

我正在处理的数据来自不同的来源(远程URL,本地文件,Google Cloud Storage),而且我习惯于将所有内容都转换为InputStream,这样我就可以解析和处理数据而无需知道数据的来源来自.

The data I am processing comes from different sources (Remote URL, local file, Google Cloud Storage), and I'm in the habit of turning everything into an InputStream so that I can parse and process data without knowing where it came from.

我在Spark上看到的所有文档都从路径读取文件,例如

All the documentation I've seen on Spark reads files from a path, e.g.

SparkConf conf = new SparkConf().setAppName("spark-sandbox").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlc = new SQLContext(sc);

DataFrame df = sqlc.read()
    .format("com.databricks.spark.csv")
    .option("inferSchema", "true")
    .option("header", "true")
    .load("path/to/file.csv");

DataFrame dfGrouped = df.groupBy("varA","varB")
    .avg("varC","varD");

dfGrouped.show();

我想做的是从InputStream读取,甚至只是一个已经存在的字符串.类似于以下内容:

And what I'd like to do is read from an InputStream, or even just an already-in-memory string. Something like the following:

InputStream stream = new URL(
    "http://www.sample-videos.com/csv/Sample-Spreadsheet-100-rows.csv"
    ).openStream();

DataFrame dfRemote = sqlc.read()
    .format("com.databricks.spark.csv")
    .option("inferSchema", "true")
    .option("header", "true")
    .load(stream);

String someString = "imagine,some,csv,data,here";

DataFrame dfFromString = sqlc.read()
    .format("com.databricks.spark.csv")
    .option("inferSchema", "true")
    .option("header", "true")
    .read(someString);

这里缺少一些简单的东西吗?

Is there something simple I'm missing here?

我已经阅读了一些有关Spark Streaming和自定义接收器的文档,但是据我所知,这是为了打开一个将连续提供数据的连接. Spark Streaming似乎将数据分解成块并对其进行了一些处理,期望有更多的数据进入无休止的流中.

I've read a bit of the docs on Spark Streaming and custom receivers, but as far as I can tell, this is for opening a connection that will be providing data continuously. Spark Streaming seems to break the data into chunks and do some processing on it, expecting more data to come in an unending stream.

我最大的猜测是Spark作为Hadoop的后代,期望大量数据可能驻留在某个文件系统中.但是由于Spark无论如何都在内存中进行处理,因此对我来说,SparkSQL能够解析内存中已经存在的数据是很有意义的.

My best guess here is that Spark as a descendant of Hadoop, expects large amounts of data that probably resides in a filesystem somewhere. But since Spark does its processing in-memory anyway, it made sense to me for SparkSQL to be able to parse data already in memory.

任何帮助将不胜感激.

推荐答案

您可以至少使用四种不同的方法来简化生活:

You can use at least four different approaches to make your life easier:

  1. 使用您的输入流,写入本地文件(使用SSD快速),使用Spark读取.

  1. Use your input stream, write to a local file (fast with SSD), read with Spark.

将Hadoop文件系统连接器用于S3, Google云存储并将所有内容转换为文件操作. (因为没有HDFS连接器,这不能解决从任意URL读取的问题.)

Use Hadoop file system connectors for S3, Google Cloud Storage and turn everything into a file operation. (That won't solve the issue with reading from an arbitrary URL as there is no HDFS connector for this.)

将不同的输入类型表示为不同的URI,并创建一个实用程序函数来检查URI并触发适当的读取操作.

Represent different input types as different URIs and create a utility function that inspects the URI and triggers the appropriate read operation.

与(3)相同,但使用案例类而不是URI,并且仅根据输入类型进行重载.

Same as (3) but use case classes instead of a URI and simply overload based on the input type.

这篇关于Spark:读取inputStream而不是File的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆