Spark Streaming到达数据框列,并添加新列以查找Redis [英] Spark Streaming reach dataframe columns and add new column looking up to Redis
问题描述
在我之前的问题中(使用Redis进行火花结构化流动态查找),这要归功于 https://stackoverflow.com/users/689676/fe2s
In my previous question(Spark Structured Streaming dynamic lookup with Redis ) , i succeeded to reach redis with mapparttions thanks to https://stackoverflow.com/users/689676/fe2s
我尝试使用mappartitions,但是我无法解决一点,如何在迭代时可以到达以下代码部分中的每一行列.因为我想丰富我在Redis中保留的查找字段的行数.我发现了类似的内容,但是我如何到达数据框列并添加新列以查找Redis.对于我非常感谢的任何帮助,谢谢.
I tried to use mappartitions but i could not solve one point, how i can reach per row column in the below code part while iterating. Because i want to enrich my per-row against my lookup fields kept in Redis. I found something like this, but how i can reach dataframe columns and add new column looking up to Redis. for any help i really much appreciate, Thanks.
import org.apache.spark.sql.types._
def transformRow(row: Row): Row = {
Row.fromSeq(row.toSeq ++ Array[Any]("val1", "val2"))
}
def transformRows(iter: Iterator[Row]): Iterator[Row] =
{
val redisConn =new RedisClient("xxx.xxx.xx.xxx",6379,1,Option("Secret123"))
println(redisConn.get("ModelValidityPeriodName").getOrElse(""))
//want to reach DataFrame column here
redisConn.close()
iter.map(transformRow)
}
val newSchema = StructType(raw_customer_df.schema.fields ++
Array(
StructField("ModelValidityPeriod", StringType, false),
StructField("ModelValidityPeriod2", StringType, false)
)
)
spark.sqlContext.createDataFrame(raw_customer_df.rdd.mapPartitions(transformRows), newSchema).show
推荐答案
迭代器 iter
表示数据帧行上的迭代器.因此,如果我正确回答了您的问题,则可以通过遍历 iter
并调用
Iterator iter
represents an iterator over the dataframe rows. So if I got your question correctly, you can access column values by iterative over iter
and calling
row.getAs[Column_Type](column_name)
类似这样的东西
def transformRows(iter: Iterator[Row]): Iterator[Row] = {
val redisConn = new RedisClient("xxx.xxx.xx.xxx",6379,1,Option("Secret123"))
println(redisConn.get("ModelValidityPeriodName").getOrElse(""))
//want to reach DataFrame column here
val res = iter.map { row =>
val columnValue = row.getAs[String]("column_name")
// lookup in redis
val valueFromRedis = redisConn.get(...)
Row.fromSeq(row.toSeq ++ Array[Any](valueFromRedis))
}.toList
redisConn.close()
res.iterator
}
这篇关于Spark Streaming到达数据框列,并添加新列以查找Redis的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!