从MongoDB GridFS加载Spark 2.x DataFrame [英] Loading a Spark 2.x DataFrame from MongoDB GridFS

查看:98
本文介绍了从MongoDB GridFS加载Spark 2.x DataFrame的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我一直在使用 Spark的MongoDB连接器从中加载DataFrame. MongoDB集合.

I've been using the MongoDB Connector for Spark to load DataFrames from MongoDB collections.

我想将更多的ETL流程移到Spark中,并希望从执行基本文件提取和解析的Java服务中将1-2 GB的文件移入Spark.由于我已经有了MongoDB集群,因此将JSON行格式的数据拖放到GridFS中很容易,而我宁愿不为此设置集群文件系统或HDFS.

I'd like to move more of my ETL process into Spark and want to get 1-2 GB files into Spark from a Java service that does the basic file ingestion and parsing. Since I've already got a MongoDB cluster, it'd be easy to drop JSON-line format data into GridFS, and I'd rather not set up a cluster filesystem or HDFS just for this.

Mongo Spark连接器不了解GridFS. 用于Hadoop的MongoDB连接器确实具有GridFSInputFormat,该文档记录在

The Mongo Spark connector knows nothing of GridFS. The MongoDB Connector for Hadoop does have a GridFSInputFormat, documented in a JIRA comment.

我看到旧的SparkContext类具有newAPIHadoopFile()方法,该方法采用InputFormat来构建RDD,但我认为SparkSession是新的热点.

I see the old SparkContext class has a newAPIHadoopFile() method that takes an InputFormat to build an RDD, but I thought SparkSession was the new hotness.

是否可以让Spark从诸如GridFSInputFormat之类的Hadoop InputFormat加载DataFrame?我想从GridFS读取JSON行文件,推断模式,最后以DataSet[Row]结尾.这种方法有什么惊人的疯狂之处吗?

Is it possible to have Spark load a DataFrame from a Hadoop InputFormat like the GridFSInputFormat? I want to read a JSON-lines file from GridFS, infer the schema, and end up with a DataSet[Row]. And is there anything glaringly insane with this approach?

推荐答案

最后没什么大不了的.我添加了Mongo Hadoop连接器:

No big deal in the end. I added the Mongo Hadoop connector:

libraryDependencies += "org.mongodb.mongo-hadoop" % "mongo-hadoop-core" % "2.0.2"

并用它来获取RDD[(NullWritable, Text)],通过调用map可以轻松地将其转换为RDD[String],然后通过sparkSession.read.json转换为DataFrame:

And used it to get an RDD[(NullWritable, Text)], which converts easily to RDD[String] with a call to map, and then to DataFrame with sparkSession.read.json:

/** Loads a DataFrame from a MongoDB GridFS file in JSON-lines format */
def loadJsonLinesFromGridFSFile(gridFsId: String): DataFrame = {
  jsonLinesToDataFrame(loadRDDFromGridFSFile(gridFsId))
}

/** Uses the Mongo Hadoop plugin to load an RDD of lines from a GridFS file */
private def loadRDDFromGridFSFile(gridFsId: String): RDD[String] = {
  val conf = new Configuration()
  val uri = config.uri.getCredentials
  conf.set("mongo.input.uri", "mongodb://127.0.0.1/somedb.fs")
  conf.set("mongo.input.format", classOf[GridFSInputFormat].getName)
  conf.set("mongo.input.query", s"{ _id: { $$oid: '$gridFsId' } }")
  sparkSession.sparkContext.newAPIHadoopRDD(
    conf, classOf[GridFSInputFormat], classOf[NullWritable], classOf[BinaryComparable]).map(_._2.toString)
}

private def jsonLinesToDataFrame(rdd: RDD[String]): DataFrame = {
  sparkSession.read.json(rdd)
}

这篇关于从MongoDB GridFS加载Spark 2.x DataFrame的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆