apache-flink相关内容

优化Flink转换

我有以下方法来计算DataSet中某个值的概率: /** * Compute the probabilities of each value on the given [[DataSet]] * * @param x single colum [[DataSet]] * @return Sequence of probabilites for each value ..
发布时间:2020-05-21 21:28:05 其他开发

使用Kafka作为EventStore时在Flink中恢复状态一致性

问题 我正在将微服务实现为事件源集合,然后将其实现为Flink FlatMapFunction.在基本设置中,聚合从两个kafka主题中读取事件和命令.然后,它将新事件写入该第一个主题,并在第三个主题中处理结果.因此,Kafka充当事件存储.希望此图对您有所帮助: RPC Request RPC Result | ..

Flink:DataSet.count()是瓶颈-如何并行计算?

我正在使用Flink学习Map-Reduce,并且对如何有效地计算数据集中的元素有疑问.到目前为止,我的情况是这样: DataSet ds = ...; long num = ds.count(); 执行此操作时,在我的flink日志中说 12/03/2016 19:47:27 DataSink(count())(1/1)切换为RUNNING 所以只使用了一 ..
发布时间:2020-05-05 15:43:33 Java开发

从FlinkML多元线性回归中提取权重

我正在为Flink(0.10-SNAPSHOT)运行示例多元线性回归.我无法弄清楚如何提取权重(例如,斜率和截距,beta0-beta1,以及您想称呼的权重).我在Scala中经验不足,这可能是我问题的一半. 感谢任何人可以提供的任何帮助. object Job { def main(args: Array[String]) { // set up the execution ..

Apache Flink的吞吐量和延迟

我为Apache Flink编写了一个非常简单的Java程序,现在我对测量统计量感兴趣,例如吞吐量(每秒处理的元组数)和等待时间(程序需要处理每个输入元组的时间). StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.readTextFile("/home/L ..
发布时间:2020-04-29 03:33:34 Java开发

Flink 1.7.0仪表板不显示任务统计信息

我使用Flink 1.7仪表板并选择一个流作业.这应该向我显示一些指标,但仍然需要加载. 我在Flink 1.5集群中部署了相同的作业,并且可以观察指标. Flink在docker swarm中运行,但是如果我在docker-compose(不在swarm中)中运行Flink 1.7,则它可以正常工作 我可以做到,删除docker-compose.yaml文件中的主机名 vers ..
发布时间:2020-04-26 14:14:38 其他开发

Kubernetes上的Apache Flink-如果Jobmanager崩溃则恢复工作

我想使用(持久)状态后端在kubernetes上运行flink作业,看来崩溃的任务管理器没有问题,因为如果我理解正确的话,他们可以询问作业管理器他们需要从哪个检查点恢复. 崩溃的工作经理似乎要困难一些.在此翻转6页我读到动物园管理员需要能够知道工作经理需要使用哪个检查点进行恢复和领导者选举. 看到kubernetes会在崩溃时重新启动jobmanager,是否有办法让新的jobmana ..

如何在flink独立安装上进行kerberos身份验证?

我有一个独立的Flink安装,我想在其上运行一个将数据写入HDFS安装的流作业. HDFS安装是Cloudera部署的一部分,并且需要Kerberos身份验证才能读取和写入HDFS.由于我没有找到有关如何使Flink与受Kerberos保护的HDFS连接的文档,因此我不得不对该过程进行一些有根据的猜测.这是我到目前为止所做的: 我为用户创建了一个密钥表文件. 在Flink工作中,我添加 ..
发布时间:2020-04-25 11:11:43 其他开发

不管窗口时间多长,在Apache Flink中合并两个流

我有两个要合并的数据流.问题在于,一个数据流的频率比另一个数据流的频率高得多,并且有时一个数据流根本不接收事件.是否可以使用一个流中的最后一个事件,并在即将发生的每个事件中将其与另一个流一起加入? 我发现的唯一解决方案是使用join函数,但是您必须指定一个公共窗口,可以在其中应用join函数.当一个流未接收到任何事件时,这是未到达窗口. 是否有可能将连接函数应用于来自一个流或另一个流的 ..
发布时间:2019-09-19 16:14:59 服务器开发

Apache Flink Streaming窗口WordCount

我有以下代码来计算来自socketTextStream的单词。需要累计单词计数和时间窗口单词计数。该程序存在一个问题,即cumulateCounts始终与窗口计数相同。为什么会出现此问题?根据窗口计数计算累积计数的正确方法是什么? StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironm ..
发布时间:2019-01-09 22:46:57 Java开发

FLINK:如何使用相同的StreamExecutionEnvironment从多个kafka集群中读取

我想从FLINK中的多个KAFKA群集中读取数据。 但结果是 kafkaMessageStream 只能从第一个Kafka读取。 只有当Kafka 分别有 2个流时,才能从两个Kafka集群中读取,这不是我想要的。 是否可以将多个来源连接到单个阅读器。 示例代码 公共类KafkaReader实现Reader { private StreamExe ..
发布时间:2019-01-08 20:23:07 Java开发

Flink Streaming:如何根据数据将一个数据流输出到不同的输出?

在Apache Flink中,我有一个元组流。让我们假设一个非常简单的 Tuple1 。元组可以在其值字段中具有任意值(例如,“P1”,“P2”等)。可能值的集合是有限的,但我事先并不知道全集(因此可能存在'P362')。我想根据元组内部的值将该元组写入某个输出位置。所以例如我想拥有以下文件结构: / output / P1 / output / P2 在文档中我只 ..
发布时间:2018-12-27 19:12:23 Java开发

flink - 使用匕首注射 - 不可序列化?

我使用Flink(最新的git)从kafka流到cassandra。为了简化单元测试我通过Dagger添加依赖注入。 ObjectGraph似乎正在正确设置自己,但是'内部对象'被Flink标记为“不可序列化”。如果我直接包含这些对象,那么它们有用吗?那么有什么区别? 有问题的类实现 MapFunction 和 @Inject 一个用于cassandra的模块和一个用于读取配置文件的模 ..
发布时间:2018-12-27 18:03:58 Java开发

Flink中的java.lang.NoSuchMethodError

我尝试使用以下方法读取文件: final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet line = env.readTextFile(“file:///pathtofile/myfile.txt”); 我收到以下错误: ..
发布时间:2018-11-27 11:31:45 Java开发