apache-flink相关内容

在运行时正常关闭 Flink Kafka Comsumer

我将 FlinkKafkaConsumer010 与 Flink 1.2.0 一起使用,我面临的问题是:如果看到某些场景,有没有一种方法可以以编程方式关闭整个管道?> 可能的解决方案是,我可以通过调用 FlinkKafkaConsumer010 内部定义的 close() 方法关闭 kafka 消费者源,然后关闭管道.对于这种方法,我创建了一个列表,其中包含对我在管道开头为 kafka 主题创建 ..
发布时间:2021-11-12 02:05:42 其他开发

flink 中的 Kafka 消费者

我正在使用 kafka 和 apache flink.我正在尝试从 apache flink 中的 kafka 主题使用记录(采用 avro 格式).下面是我正在尝试使用的一段代码. 使用自定义反序列化器反序列化主题中的 avro 记录. 我发送到主题“test-topic"的数据的 Avro 模式;如下. {“命名空间":“com.example.flink.avro",“类型": ..
发布时间:2021-11-12 02:04:26 Java开发

是否可以反序列化 Avro 消息(使用来自 Kafka 的消息)而不在 ConfluentRegistryAvroDeserializationSchema 中提供 Reader 模式

我在 Apache Flink 中使用 Kafka 连接器来访问由 Confluent Kafka 提供的流. 除了模式注册表 url ConfluentRegistryAvroDeserializationSchema.forGeneric(...) 期待“阅读器"模式.我不想提供读取模式,而是想使用相同的作者模式(在注册表中查找)来读取消息,因为消费者不会有最新的模式. FlinkKa ..

Scala案例类中init方法的java.lang.NoSuchMethodException

我正在编写一个 Apache Flink 流应用程序,它对从 Kafka 总线读取的数据(Avro 格式)进行反序列化(有关 此处).数据被反序列化为 Scala 案例类.我在运行程序时遇到异常,它收到了来自 Kafka 的第一条消息 线程“main"中的异常 org.apache.flink.runtime.client.JobExecutionException: java.lang.Run ..
发布时间:2021-11-12 02:02:06 其他开发

如何在 Scala 中使用 Flink 的 KafkaSource?

我正在尝试使用 Flink 的 KafkaSource 运行一个简单的测试程序.我正在使用以下内容: Flink 0.9 Scala 2.10.4 卡夫卡 0.8.2.1 我按照文档来测试 KafkaSource(添加依赖项,将 Kafka 连接器 flink-connector-kafka 捆绑在插件中),如 此处 和 此处. 下面是我的简单测试程序: import or ..
发布时间:2021-11-12 02:01:45 其他开发

Apache Flink:Python 流式 API 中的 Kafka 连接器,“无法加载用户类"

我正在尝试 Flink 的新 Python 流 API,并尝试使用 ./flink-1.6.1/bin/pyflink-stream.sh examples/read_from_kafka.py 运行我的脚本.python 脚本相当简单,我只是尝试从现有主题中使用并将所有内容发送到 stdout(或日志目录中的 *.out 文件,默认情况下输出方法在该文件中发出数据). import glob导 ..
发布时间:2021-11-12 01:59:26 Python

Apache Flink:Python 流式 API 中的 Kafka 连接器,“无法加载用户类"

我正在尝试 Flink 的新 Python 流 API,并尝试使用 ./flink-1.6.1/bin/pyflink-stream.sh examples/read_from_kafka.py 运行我的脚本.python 脚本相当简单,我只是尝试从现有主题中使用并将所有内容发送到 stdout(或日志目录中的 *.out 文件,默认情况下输出方法在该文件中发出数据). import glob导 ..
发布时间:2021-11-12 01:59:05 Python

如何为 Kafka 2.2 实现 FlinkKafkaProducer 序列化器

我一直致力于更新从 Kafka 读取然后写入 Kafka 的 Flink 处理器(Flink 1.9 版).我们已经编写了这个处理器来运行 Kafka 0.10.2 集群,现在我们已经部署了一个运行 2.2 版的新 Kafka 集群.因此,我开始更新处理器以使用最新的 FlinkKafkaConsumer 和 FlinkKafkaProducer(按照 Flink 文档的建议).但是,我遇到了 K ..
发布时间:2021-11-12 01:45:13 Java开发

Flink如何支持多个KeyBy

在下面的代码示例中,我试图获取一系列员工记录 { Country, Employer, Name, Salary, Age } 并倾销每个国家/地区的最高薪员工.不幸的是,Multiple KEY By 不起作用. 只有 KeyBy(Employer) 在反映,因此我没有得到正确的结果.我错过了什么? StreamExecutionEnvironment env = StreamExecu ..
发布时间:2021-11-12 01:44:32 Java开发

如何在不停机的情况下部署新作业

我有一个 Apache Flink 应用程序,它从单个 Kafka 主题中读取数据.我想不时更新应用程序而不会遇到停机.目前,Flink 应用程序执行一些简单的操作,例如 map 和一些通过 http rest API 到外部系统的同步 IO. 我尝试使用停止命令,但我收到“作业终止 (STOP) 失败:此作业不可停止.",我了解 Kafka 连接器不支持停止行为 - 一个链接!一个简单的解 ..
发布时间:2021-11-12 01:18:35 其他开发

在路线图上演进 TTL?

Flink 1.8 版引入了对进化状态模式的支持.我的问题是 Flink 是否会引入对进化时间(TTL)状态的支持.现在,您无法将 TTL 添加到未配置 TTL 的现有状态并期望能够恢复状态.我很好奇 Flink 未来是否打算开放这种可能性?我现在看到的唯一解决方法是在我的所有状态上启用 TTL,然后只设置到期时间,这样一些令人难以置信的高值让我可以灵活地在以后使用 TTL 功能,如果我仍然对它感 ..
发布时间:2021-11-12 01:18:32 其他开发

Flink 1.2 无法在 HA Cluster 模式下启动

我已经在 HA 集群模式 2 JobManagers 1 TaskManager 本地安装了 Flink 1.2 并且它一直拒绝在这种模式下实际启动显示“启动集群."消息而不是“在 ZooKeeper 仲裁中启动具有 2 个主节点和 1 个对等节点的 HA 集群." 显然在 bin/config.sh 中,它读取的配置如下: # 高可用如果 [ -z "${HIGH_AVAILABILIT ..
发布时间:2021-11-12 01:18:29 其他开发

TaskManager 的 Flink 状态后端

我有一个 Flink v1.2 设置,其中有 1 个 JobManager、2 个 TaskManager,每个都在它自己的 VM 中.我将状态后端配置为文件系统并将其指向上述每个主机的本地位置(state.backend.fs.checkpointdir:file:///home/ubuntu/Prototype/flink/flink-checkpoints).我已将并行度设置为 1,并且每个 ..
发布时间:2021-11-12 01:18:23 其他开发

从hbase读取时Flink抛出序列化错误

当我在地图中使用richfatMapFunction 从hbase 读取时,出现序列化错误.我想要做的是如果数据流等于从 hbase else 忽略读取的特定字符串.下面是我得到的示例程序和错误. 包 com.abb.Flinktest导入 java.text.SimpleDateFormat导入 java.util.Properties导入 scala.collection.concurren ..
发布时间:2021-11-12 01:18:14 其他开发

Apache Flink 从 Kafka 读取 Avro byte[]

在回顾示例时,我看到了很多这样的内容: FlinkKafkaConsumer08kafkaConsumer = new FlinkKafkaConsumer08("myavrotopic", avroSchema, properties); 我看到他们在这里已经知道架构. 在将 byte[] 读入通用记录之前,我不知道架构然后获取架构.(因为它可能会因记录而异) 有人可以将我指向从 ..
发布时间:2021-11-12 01:18:11 Java开发

Flink 1.3 在 YARN 上运行单个作业如何设置每个 TaskManager 的任务槽数

我正在 Yarn 上运行单个 flink 作业 这里. flink run -m yarn-cluster -yn 3 -ytm 12000 我可以通过上面的参数-yn来设置纱线节点/任务管理器的数量.但是我想知道是否可以设置每个任务管理器的任务槽数.当我使用 parallelsim (-p) 参数时,它只设置整体并行度.并且通过将这个值除以提供的任务管理器的数量来计算任务槽的数量.我尝试使用 ..
发布时间:2021-11-12 01:18:08 其他开发