使用Redis进行Spark结构化流式动态查找 [英] Spark Structured Streaming dynamic lookup with Redis

查看:117
本文介绍了使用Redis进行Spark结构化流式动态查找的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是新来的人.我们目前正在建立管道:

i am new to spark. We are currently building a pipeline :

  1. 阅读Kafka主题中的事件
  2. 借助Redis-Lookup丰富此数据
  3. 将事件写到新的Kafka主题

所以,我的问题是,当我想使用spark-redis库时,它的性能很好,但是数据在流工作中保持静态.

So, my problem is when i want to use spark-redis library it performs very well, but data stays static in my streaming job.

尽管Redis会刷新数据,但不会反映到我的数据框中.Spark首先读取数据,然后再不更新数据.我也是从REDIS数据中读取的,首先是关于1mio键值字符串的全部数据.

Although data is refreshed at Redis, it does not reflect to my dataframe. Spark reads data at first then never updates it. Also i am reading from REDIS data at first,total data about 1mio key-val string.

我可以使用哪种方法/方法,我想将Redis用作内存中的动态查找.而且查询表几乎要更改1个小时.

What kind of approaches/methods i can do, i want to use Redis as in-memory dynamic lookup. And lookup table is changing almost 1 hour.

谢谢.

使用的库:spark-redis-2.4.1.jarcommons-pool2-2.0.jarjedis-3.2.0.jar

used libraries: spark-redis-2.4.1.jar commons-pool2-2.0.jar jedis-3.2.0.jar

以下是代码部分:

import com.intertech.hortonworks.spark.registry.functions._
val config = Map[String, Object]("schema.registry.url" -> "http://aa.bbb.ccc.yyy:xxxx/api/v1")
implicit val srConfig:SchemaRegistryConfig = SchemaRegistryConfig(config)
var rawEventSchema = sparkSchema("my_raw_json_events") 


val my_raw_events_df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "aa.bbb.ccc.yyy:9092")
.option("subscribe", "my-raw-event")
.option("failOnDataLoss","false")
.option("startingOffsets", "earliest")
.option("maxOffsetsPerTrigger",1000)
.load()
.select(from_json($"value".cast("string"),rawEventSchema, Map.empty[String, String])
        .alias("C"))


import com.redislabs.provider.redis._
val sc = spark.sparkContext
val stringRdd = sc.fromRedisKV("PARAMETERS:*") 
val lookup_map = stringRdd.collect().toMap
val lookup = udf((key: String) => lookup_map.getOrElse(key,"") )



val curated_df = my_raw_events_df 
.select(

     ...
     $"C.SystemEntryDate".alias("RecordCreateDate")
    ,$"C.Profile".alias("ProfileCode")     
    ,**lookup(expr("'PARAMETERS:PROFILE||'||NVL(C.Profile,'')")).alias("ProfileName")**
    ,$"C.IdentityType"     
    ,lookup(expr("'PARAMETERS:IdentityType||'||NVL(C.IdentityType,'')")).alias("IdentityTypeName")     
     ...

).as("C")



import org.apache.spark.sql.streaming.Trigger

val query = curated_df
   .select(to_sr(struct($"*"), "curated_event_sch").alias("value"))
   .writeStream
   .format("kafka")
   .option("kafka.bootstrap.servers", "aa.bbb.ccc.yyy:9092")
   .option("topic", "curated-event")
   .option("checkpointLocation","/user/spark/checkPointLocation/xyz")
   .trigger(Trigger.ProcessingTime("30 seconds"))
   .start()

   query.awaitTermination()

推荐答案

一种选择是不使用spark-redis,而是直接在Redis中进行查找.这可以通过 df.mapPartitions 函数来实现.您可以在此处中找到一些示例.https://blog.codecentric.de/zh/2017/07/lookup-additional-data-in-spark-streaming/.结构流的想法是相似的.注意正确处理Redis连接.

One option is to not use spark-redis, but rather lookup in Redis directly. This can be achieved with df.mapPartitions function. You can find some examples for Spark DStreams here https://blog.codecentric.de/en/2017/07/lookup-additional-data-in-spark-streaming/. The idea for Structural Streaming is similar. Be careful to handle the Redis connection properly.

这篇关于使用Redis进行Spark结构化流式动态查找的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆