如何在Apache Spark中将聚合数据添加到原始数据集中? [英] How to add aggregated data to the original dataset in Apache Spark?
问题描述
我试图弄清楚如何从数据集中聚合数据,然后使用Apache Spark将结果添加到原始数据集中.我尝试了两种我不满意的解决方案,我想知道是否还有我没有看到的更具扩展性和效率的解决方案.
I am trying to figure out how to aggregate data from a dataset and then add the result to the original dataset using Apache Spark. I have tried 2 solutions that I'm not satisfied with, and I wonder if there's a more scalable and efficient solution that I'm not seeing.
以下是非常简化的输入和预期输出数据的样本:
Here are very simplified samples of input and expected output data:
输入:
客户列表,以及每个客户的已购买商品列表.
A list of customers, and for each customer, a list of purchased items.
(John, [toast, butter])
(Jane, [toast, jelly])
输出:
客户列表,对于每个客户,列出购买商品,对于每个商品,列出购买此商品的客户数量.
A list of customers, and for each customer, a list of purchased items, and for each item, the number of customers who purchased this item.
(John, [(toast, 2), (butter, 1)])
(Jane, [(toast, 2), (jelly, 1)])
这是我到目前为止尝试过的解决方案,列出了步骤并输出了数据.
Here are the solutions I've tried so far, listing steps and output data.
解决方案1:
Start with a pair rdd:
(John, [toast, butter])
(Jane, [toast, jelly])
flatMapToPair:
(toast, John)
(butter, John)
(toast, Jane)
(jelly, Jane)
aggregateByKey:
(toast, [John, Jane])
(butter, [John])
(jelly, [Jane])
flatMapToPair: (using the size of the list of customers)
(John, [(toast, 2), (butter, 1)])
(Jane, [(toast, 2), (jelly, 1)])
虽然这适用于较小的数据集,但这是一个糟糕的主意,因为它具有较大的数据集,因为在某一点上,您为每种产品拥有庞大的客户列表,这些客户可能不适合执行者的记忆.
While this works for a small dataset, it's a terrible idea with a larger one because at one point you hold for each product a huge list of customers that probably won't fit in the executor's memory.
解决方案2:
Start with a pair rdd:
(John, [toast, butter])
(Jane, [toast, jelly])
flatMapToPair:
(toast, John)
(butter, John)
(toast, Jane)
(jelly, Jane)
aggregateByKey: (counting customers without creating a list)
(toast, 2)
(butter, 1)
(jelly, 1)
join: (using the two previous results)
(toast, (John, 2))
(butter, (John, 1))
(toast, (Jane, 2))
(jelly, (Jane, 1))
mapToPair:
(John, (toast, 2))
(John, (butter, 1))
(Jane, (toast, 2))
(Jane, (jelly, 1))
aggregateByKey:
(John, [(toast, 2), (butter, 1)])
(Jane, [(toast, 2), (jelly, 1)])
该解决方案应该有效,但我觉得应该有一些其他解决方案,可能不涉及加入RDD.
This solution should work, but I feel like there should be some other solution that might not involve joining RDDs.
针对此问题,是否存在更具可扩展性/效率/更好的解决方案3"?
推荐答案
我认为另一种方法是使用GraphX.
I thing another approach would be to use GraphX.
这是工作代码(scala 2.11.12,Spark 2.3.0):
Here is working code (scala 2.11.12, Spark 2.3.0):
import org.apache.spark.graphx._
import org.apache.spark.sql.SparkSession
object Main {
private val ss = SparkSession.builder().appName("").master("local[*]").getOrCreate()
private val sc = ss.sparkContext
def main(args: Array[String]): Unit = {
sc.setLogLevel("ERROR")
// Class for vertex values
case class Value(name: String, names: List[String], count: Int)
// Message that is sent from one Vertex to another
case class Message(names: List[String], count: Int)
// Simulate input data
val allData = sc.parallelize(Seq(
("John", Seq("toast", "butter")),
("Jane", Seq("toast", "jelly"))
))
// Create vertices
// Goods and People names - all will become vertices
val vertices = allData.flatMap(pair =>
pair._2 // Take all goods bought
.union(Seq(pair._1)) // add name
.map(v => (v.hashCode.toLong, Value(v, List[String](), 0)))) // (id, Value)
// Hash codes are required because in GraphX in vertexes requires IDs as Long
// Create edges: Person --> Bought goods
val edges = allData
.flatMap(pair =>
pair._2 // Take all goods
.map(goods => Edge[Int](pair._1.hashCode().toLong, goods.hashCode.toLong, 0))) // create pairs of (person, bought_good)
// Create graph from edges and vertices
val graph = Graph(vertices, edges)
// Initial message will be sent to all vertexes at the start
val initialMsg = Message(List[String](), 0)
// How vertex should process received message
def onMsgReceive(vertexId: VertexId, value: Value, msg: Message): Value = {
if (msg == initialMsg) value // Just ignore initial message
else Value(value.name, msg.names, msg.count) // Received message already contains all our results
}
// How vertexes should send messages
def sendMsg(triplet: EdgeTriplet[Value, Int]): Iterator[(VertexId, Message)] = {
// Each vertix sends only one message with it's own name and 1
Iterator((triplet.dstId, Message(List[String](triplet.srcAttr.name), 1)))
}
// How incoming messages to one vertex should be merged
def mergeMsg(msg1: Message, msg2: Message): Message = {
// On the goods vertices messages from people who bought them will merge
// Final message will contain names of all people who bought this good and count of them
Message(msg1.names ::: msg2.names, msg1.count + msg2.count)
}
// Kick out pregel calculation
val res = graph
.pregel(initialMsg, Int.MaxValue, EdgeDirection.Out)(onMsgReceive, sendMsg, mergeMsg)
val values = res.vertices
.filter(v => v._2.count != 0) // Filter out people - they will not have any incoming edges
.map(pair => pair._2) // Also remove IDs
values // (good, (List of names, count))
.flatMap(v => v.names.map(n => (n, (v.name, v.count)))) // transform to (name, (good, count))
.aggregateByKey(List[(String, Int)]())((acc, v) => v :: acc, (acc1, acc2) => acc1 ::: acc2) // aggregate by names
.collect().foreach(println) // Print the result
}
}
也许有更好的方法可以用相同的方法来做到这一点,但是结果仍然是:
Probably there is better way how to do it with same approach, but still - the result:
=======================================
(Jane,List((jelly,1), (toast,2)))
(John,List((butter,1), (toast,2)))
更新
第二个例子是我在评论中所说的.
UPDATE
This second example is what i was talking about in comments.
import org.apache.spark.graphx._
import org.apache.spark.sql.SparkSession
object Main {
private val ss = SparkSession.builder().appName("").master("local[*]").getOrCreate()
private val sc = ss.sparkContext
def main(args: Array[String]): Unit = {
sc.setLogLevel("ERROR")
// Entity and how much it was bought
case class Entity(name: String, bought: Int)
// Class for vertex values
case class Value(name: Entity, names: List[Entity])
// Message that is sent from one Vertex to another
case class Message(items: List[Entity])
// Simulate input data
val allData = sc.parallelize(Seq(
("John", Seq("toast", "butter")),
("Jane", Seq("toast", "jelly"))
))
// First calculate how much of each Entity was bought
val counts = allData
.flatMap(pair => pair._2.map(v => (v, 1))) // flatten all bought items
.reduceByKey(_ + _) // count occurrences
.map(v => Entity(v._1, v._2)) // create items
// Create vertices
// Goods and People names - all will become vertices
val vertices = allData
.map(pair => Entity(pair._1, 0)) // People are also Entities - but with 0, since they were not bought :)
.union(counts) //
.map(v => (v.name.hashCode.toLong, Value(Entity(v.name, v.bought), List[Entity]()))) // (key, value)
// Hash codes are required because in GraphX in vertexes requires IDs as Long
// Create edges: Entity --> Person
val edges = allData
.flatMap(pair =>
pair._2 // Take all goods
.map(goods => Edge[Int](goods.hashCode.toLong, pair._1.hashCode().toLong, 0)))
// Create graph from edges and vertices
val graph = Graph(vertices, edges)
// Initial message will be sent to all vertexes at the start
val initialMsg = Message(List[Entity](Entity("", 0)))
// How vertex should process received message
def onMsgReceive(vertexId: VertexId, value: Value, msg: Message): Value = {
if (msg == initialMsg) value // Just ignore initial message
else Value(value.name, msg.items) // Received message already contains all results
}
// How vertexes should send messages
def sendMsg(triplet: EdgeTriplet[Value, Int]): Iterator[(VertexId, Message)] = {
// Each vertex sends only one message with it's own Entity
Iterator((triplet.dstId, Message(List[Entity](triplet.srcAttr.name))))
}
// How incoming messages to one vertex should be merged
def mergeMsg(msg1: Message, msg2: Message): Message = {
// On the goods vertices messages from people who bought them will merge
// Final message will contain names of all people who bought this good and count of them
Message(msg1.items ::: msg2.items)
}
// Kick out pregel calculation
val res = graph
.pregel(initialMsg, Int.MaxValue, EdgeDirection.Out)(onMsgReceive, sendMsg, mergeMsg)
res
.vertices
.filter(vertex => vertex._2.names.nonEmpty) // Filter persons
.map(vertex => (vertex._2.name.name, vertex._2.names)) // Remove vertex IDs
.collect() // Print results
.foreach(println)
}
}
这篇关于如何在Apache Spark中将聚合数据添加到原始数据集中?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!