apache-flink相关内容
我将 FlinkKafkaConsumer010 与 Flink 1.2.0 一起使用,我面临的问题是:如果看到某些场景,有没有一种方法可以以编程方式关闭整个管道?> 可能的解决方案是,我可以通过调用 FlinkKafkaConsumer010 内部定义的 close() 方法关闭 kafka 消费者源,然后关闭管道.对于这种方法,我创建了一个列表,其中包含对我在管道开头为 kafka 主题创建
..
我正在使用 kafka 和 apache flink.我正在尝试从 apache flink 中的 kafka 主题使用记录(采用 avro 格式).下面是我正在尝试使用的一段代码. 使用自定义反序列化器反序列化主题中的 avro 记录. 我发送到主题“test-topic"的数据的 Avro 模式;如下. {“命名空间":“com.example.flink.avro",“类型":
..
我在 Apache Flink 中使用 Kafka 连接器来访问由 Confluent Kafka 提供的流. 除了模式注册表 url ConfluentRegistryAvroDeserializationSchema.forGeneric(...) 期待“阅读器"模式.我不想提供读取模式,而是想使用相同的作者模式(在注册表中查找)来读取消息,因为消费者不会有最新的模式. FlinkKa
..
我正在编写一个 Apache Flink 流应用程序,它对从 Kafka 总线读取的数据(Avro 格式)进行反序列化(有关 此处).数据被反序列化为 Scala 案例类.我在运行程序时遇到异常,它收到了来自 Kafka 的第一条消息 线程“main"中的异常 org.apache.flink.runtime.client.JobExecutionException: java.lang.Run
..
我正在尝试使用 Flink 的 KafkaSource 运行一个简单的测试程序.我正在使用以下内容: Flink 0.9 Scala 2.10.4 卡夫卡 0.8.2.1 我按照文档来测试 KafkaSource(添加依赖项,将 Kafka 连接器 flink-connector-kafka 捆绑在插件中),如 此处 和 此处. 下面是我的简单测试程序: import or
..
我正在尝试 Flink 的新 Python 流 API,并尝试使用 ./flink-1.6.1/bin/pyflink-stream.sh examples/read_from_kafka.py 运行我的脚本.python 脚本相当简单,我只是尝试从现有主题中使用并将所有内容发送到 stdout(或日志目录中的 *.out 文件,默认情况下输出方法在该文件中发出数据). import glob导
..
我正在尝试 Flink 的新 Python 流 API,并尝试使用 ./flink-1.6.1/bin/pyflink-stream.sh examples/read_from_kafka.py 运行我的脚本.python 脚本相当简单,我只是尝试从现有主题中使用并将所有内容发送到 stdout(或日志目录中的 *.out 文件,默认情况下输出方法在该文件中发出数据). import glob导
..
kafka 的文档给出了一个方法,如下描述: 每个线程一个消费者:一个简单的选择是为每个线程提供自己的消费者 > 实例. 我的代码: public class KafkaConsumerRunner 实现 Runnable {private final AtomicBoolean closed = new AtomicBoolean(false);私有的最终 CloudKafkaCo
..
我一直致力于更新从 Kafka 读取然后写入 Kafka 的 Flink 处理器(Flink 1.9 版).我们已经编写了这个处理器来运行 Kafka 0.10.2 集群,现在我们已经部署了一个运行 2.2 版的新 Kafka 集群.因此,我开始更新处理器以使用最新的 FlinkKafkaConsumer 和 FlinkKafkaProducer(按照 Flink 文档的建议).但是,我遇到了 K
..
在下面的代码示例中,我试图获取一系列员工记录 { Country, Employer, Name, Salary, Age } 并倾销每个国家/地区的最高薪员工.不幸的是,Multiple KEY By 不起作用. 只有 KeyBy(Employer) 在反映,因此我没有得到正确的结果.我错过了什么? StreamExecutionEnvironment env = StreamExecu
..
我有一个 Apache Flink 应用程序,它从单个 Kafka 主题中读取数据.我想不时更新应用程序而不会遇到停机.目前,Flink 应用程序执行一些简单的操作,例如 map 和一些通过 http rest API 到外部系统的同步 IO. 我尝试使用停止命令,但我收到“作业终止 (STOP) 失败:此作业不可停止.",我了解 Kafka 连接器不支持停止行为 - 一个链接!一个简单的解
..
Flink 1.8 版引入了对进化状态模式的支持.我的问题是 Flink 是否会引入对进化时间(TTL)状态的支持.现在,您无法将 TTL 添加到未配置 TTL 的现有状态并期望能够恢复状态.我很好奇 Flink 未来是否打算开放这种可能性?我现在看到的唯一解决方法是在我的所有状态上启用 TTL,然后只设置到期时间,这样一些令人难以置信的高值让我可以灵活地在以后使用 TTL 功能,如果我仍然对它感
..
我已经在 HA 集群模式 2 JobManagers 1 TaskManager 本地安装了 Flink 1.2 并且它一直拒绝在这种模式下实际启动显示“启动集群."消息而不是“在 ZooKeeper 仲裁中启动具有 2 个主节点和 1 个对等节点的 HA 集群." 显然在 bin/config.sh 中,它读取的配置如下: # 高可用如果 [ -z "${HIGH_AVAILABILIT
..
我试图在我的 Flink 工作中与我的 Kafka 源进行并行处理,但到目前为止我失败了. 我为我的 Kafka 制作人设置了 4 个分区: $ ./bin/kafka-topics.sh --describe --zookeeper X.X.X.X:2181 --topic mytopic主题:mytopic PartitionCount:4 ReplicationFactor:1 配置
..
我有一个 Flink v1.2 设置,其中有 1 个 JobManager、2 个 TaskManager,每个都在它自己的 VM 中.我将状态后端配置为文件系统并将其指向上述每个主机的本地位置(state.backend.fs.checkpointdir:file:///home/ubuntu/Prototype/flink/flink-checkpoints).我已将并行度设置为 1,并且每个
..
我正在尝试从 flink 读取 kafka 数据,由于我是 kafka 和 flink 的新手,我不知道如何连接它们. 解决方案 Flink 提供了 Kafka 连接器.为了从 Kafka 主题中读取数据,首先需要添加 Flink -Kafka 连接器依赖项. org.apache.flinkflink-connector-k
..
我想对 Spark 与 Flink 进行基准测试,为此我做了几个测试.但是 Flink 不适用于 Kafka,同时使用 Spark 工作完美. 代码很简单: val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentval 属性 = 新属性()properties.setP
..
当我在地图中使用richfatMapFunction 从hbase 读取时,出现序列化错误.我想要做的是如果数据流等于从 hbase else 忽略读取的特定字符串.下面是我得到的示例程序和错误. 包 com.abb.Flinktest导入 java.text.SimpleDateFormat导入 java.util.Properties导入 scala.collection.concurren
..
在回顾示例时,我看到了很多这样的内容: FlinkKafkaConsumer08kafkaConsumer = new FlinkKafkaConsumer08("myavrotopic", avroSchema, properties); 我看到他们在这里已经知道架构. 在将 byte[] 读入通用记录之前,我不知道架构然后获取架构.(因为它可能会因记录而异) 有人可以将我指向从
..
我正在 Yarn 上运行单个 flink 作业 这里. flink run -m yarn-cluster -yn 3 -ytm 12000 我可以通过上面的参数-yn来设置纱线节点/任务管理器的数量.但是我想知道是否可以设置每个任务管理器的任务槽数.当我使用 parallelsim (-p) 参数时,它只设置整体并行度.并且通过将这个值除以提供的任务管理器的数量来计算任务槽的数量.我尝试使用
..