Spark:读取 inputStream 而不是 File [英] Spark: Read an inputStream instead of File

查看:42
本文介绍了Spark:读取 inputStream 而不是 File的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在 Java 应用程序中使用 SparkSQL 对 CSV 文件进行一些处理,使用 Databricks 进行解析.

I'm using SparkSQL in a Java application to do some processing on CSV files using Databricks for parsing.

我正在处理的数据来自不同的来源(远程 URL、本地文件、谷歌云存储),我习惯将所有东西都变成 InputStream,这样我就可以在不知道它来自哪里的情况下解析和处理数据来自.

The data I am processing comes from different sources (Remote URL, local file, Google Cloud Storage), and I'm in the habit of turning everything into an InputStream so that I can parse and process data without knowing where it came from.

我在 Spark 上看到的所有文档都从路径读取文件,例如

All the documentation I've seen on Spark reads files from a path, e.g.

SparkConf conf = new SparkConf().setAppName("spark-sandbox").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlc = new SQLContext(sc);

DataFrame df = sqlc.read()
    .format("com.databricks.spark.csv")
    .option("inferSchema", "true")
    .option("header", "true")
    .load("path/to/file.csv");

DataFrame dfGrouped = df.groupBy("varA","varB")
    .avg("varC","varD");

dfGrouped.show();

我想做的是从 InputStream 中读取,甚至只是从内存中的字符串中读取.类似于以下内容:

And what I'd like to do is read from an InputStream, or even just an already-in-memory string. Something like the following:

InputStream stream = new URL(
    "http://www.sample-videos.com/csv/Sample-Spreadsheet-100-rows.csv"
    ).openStream();

DataFrame dfRemote = sqlc.read()
    .format("com.databricks.spark.csv")
    .option("inferSchema", "true")
    .option("header", "true")
    .load(stream);

String someString = "imagine,some,csv,data,here";

DataFrame dfFromString = sqlc.read()
    .format("com.databricks.spark.csv")
    .option("inferSchema", "true")
    .option("header", "true")
    .read(someString);

我在这里遗漏了一些简单的东西吗?

Is there something simple I'm missing here?

我已经阅读了一些关于 Spark Streaming 和自定义接收器的文档,但据我所知,这是为了打开一个将持续提供数据的连接.Spark Streaming 似乎将数据分解成块并对其进行一些处理,期望更多数据进入无休止的流中.

I've read a bit of the docs on Spark Streaming and custom receivers, but as far as I can tell, this is for opening a connection that will be providing data continuously. Spark Streaming seems to break the data into chunks and do some processing on it, expecting more data to come in an unending stream.

我最好的猜测是 Spark 作为 Hadoop 的后代,期望大量数据可能驻留在某个文件系统中.但由于 Spark 无论如何都是在内存中进行处理的,因此 SparkSQL 能够解析内存中已有的数据对我来说是有意义的.

My best guess here is that Spark as a descendant of Hadoop, expects large amounts of data that probably resides in a filesystem somewhere. But since Spark does its processing in-memory anyway, it made sense to me for SparkSQL to be able to parse data already in memory.

任何帮助将不胜感激.

推荐答案

您可以使用至少四种不同的方法让您的生活更轻松:

You can use at least four different approaches to make your life easier:

  1. 使用您的输入流,写入本地文件(使用 SSD 快速),使用 Spark 读取.

  1. Use your input stream, write to a local file (fast with SSD), read with Spark.

将 Hadoop 文件系统连接器用于 S3,Google 云存储 并将所有内容转换为文件操作.(这不会解决从任意 URL 读取的问题,因为没有 HDFS 连接器.)

Use Hadoop file system connectors for S3, Google Cloud Storage and turn everything into a file operation. (That won't solve the issue with reading from an arbitrary URL as there is no HDFS connector for this.)

将不同的输入类型表示为不同的 URI,并创建一个实用函数来检查 URI 并触发适当的读取操作.

Represent different input types as different URIs and create a utility function that inspects the URI and triggers the appropriate read operation.

与 (3) 相同,但使用案例类而不是 URI 并根据输入类型简单重载.

Same as (3) but use case classes instead of a URI and simply overload based on the input type.

这篇关于Spark:读取 inputStream 而不是 File的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆