从Redis读取数据到Flink [英] Read data from Redis to Flink

查看:32
本文介绍了从Redis读取数据到Flink的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我一直在尝试寻找一个连接器来将数据从 Redis 读取到 Flink.Flink 的文档包含写入 Redis 的连接器的描述.我需要在我的 Flink 作业中从 Redis 读取数据.在 使用 Apache Flink 进行数据流传输中,Fabian 提到可以从Redis读取数据.可以用于此目的的连接器是什么?

I have been trying to find a connector to read data from Redis to Flink. Flink's documentation contains the description for a connector to write to Redis. I need to read data from Redis in my Flink job. In Using Apache Flink for data streaming, Fabian has mentioned that it is possible to read data from Redis. What is the connector that can be used for the purpose?

推荐答案

我们正在生产中运行一个大致如下所示的方案

We are running one in production that looks roughly like this

class RedisSource extends RichSourceFunction[SomeDataType] {

  var client: RedisClient = _

  override def open(parameters: Configuration) = {
    client = RedisClient() // init connection etc
  }

  @volatile var isRunning = true

  override def cancel(): Unit = {
    isRunning = false
    client.close()
  }

  override def run(ctx: SourceContext[SomeDataType]): Unit = while (isRunning) {
      for {
        data <- ??? // get some data from the redis client
      } yield ctx.collect(SomeDataType(data))

  }
} 

我认为这真的取决于您需要从 redis 中获取什么.以上可用于从列表/队列中获取消息,转换/推送,然后将其从队列中删除.Redis 还支持 Pub/Sub,因此可以订阅、抓取 SourceConext 和向下游推送消息.

I think it really depends on what you need to fetch from redis. The above could be used to fetch a message from a list/queue, transform/push and then delete it form the queue. Redis also supports Pub/Sub, so it would possible to subscribe, grab the SourceConext and push messages downstream.

这篇关于从Redis读取数据到Flink的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆