apache-flink相关内容
我在以JSON格式将数据从Flink发送到Kafka主题的位置有一个管道。我也能够从Kafka主题中获得它,也能够获得JSON属性。现在,像scala reflect类(我还可以在运行时比较数据类型)一样,我试图使用TypeInformation在Fink中做同样的事情,在那里我可以设置一些预定义的格式,从主题读取的任何数据都应该在这个Validation下,并且应该相应地传递或失败。 我有如下数
..
我有一个本地运行良好的Flink作业,但在尝试群集上的flink run该作业时失败。它基本上是从卡夫卡中读取,进行一些转换,然后写入水槽。尝试通过'connector' = 'kafka'从Kafka加载数据时出错。 这是我的pom.xml,注:flink-connector-kafka包含在内。
..
例如,如果我的事件顺序为键A,事件顺序为键B,并行度为2。是否所有键A的事件都转到一个任务槽,而键B的事件都转到另一个任务槽? 如果我只按键A的顺序获取事件,会发生什么情况?它们是否也会分发到两个任务槽。这是否意味着我失去了它们出现的顺序? 推荐答案 不,它不完全是这样工作的。 发生的情况是每个键都映射到一个键组,其中键组总数由集群的最大并行度(配置设置)决定。然后将键组映射到
..
不知何故,我无法在单个环境中执行语句集和可查询流,如果我的最后一条语句是flinkEnv.ecute,它将执行可查询流,而不执行语句集中的其他语句,反之亦然 val flinkEnv = StreamExecutionEnvironment.getExecutionEnvironment(); val tableEnv = StreamTableEnvironment.create(flin
..
我有一个ProcessAllWindowFunction实现(请参阅下面代码中的AttributeBackLogEvents()),它有相当多的I/O,可能需要30秒以上。windowAll()正在使用30秒的TumblingProcessingTimeWindows对数据进行窗口化。 attributedStream .windowAll(TumblingProce
..
如标题所示。虽然getJobId在RuntimeContext中可用,但作业名称不可用。 https://nightlies.apache.org/flink/flink-docs-release-1.13/api/java/org/apache/flink/api/common/functions/RuntimeContext.html 尝试从配置中获取似乎效果不佳: @O
..
背景: 我一直在尝试在相同的Flink应用程序中设置Batch+Streaming,该应用程序部署在Kinesis Analytics运行时。流部件工作正常,但添加批处理支持时遇到问题。 Flink : Handling Keyed Streams with data older than application watermark Apache Flink : Batch Mode
..
我安装了Apache Flink,转到flink-1.14.3文件夹并运行.bin/start-cluster.sh。 它似乎已成功启动群集,因为它输出了以下内容- Starting cluster. Starting standalonesession daemon on host MacBook.local. Starting taskexecutor daemon on host
..
为了丰富数据流,我们计划将MySQL(MemSQL)服务器连接到现有的Flink流应用程序 我们可以看到,Flink提供了一个带有JDBC连接器的表APIhttps://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/jdbc/ 此外,我还发现了另一个名为Flink-CDChttps://verver
..
这是我的表: CREATE TABLE orders ( `id` STRING, `currency_code` STRING, `total` DECIMAL(10,2), `order_time` TIMESTAMP(3), WATERMARK FOR `order_time` AS order_time - INTERVAL '30' SECONDS ) WITH
..
我按照first steps安装Flink。 我可以毫无问题地启动群集 $ start-cluster.sh Starting cluster. Starting standalonesession daemon on host DESKTOP-.... Starting taskexecutor daemon on host DESKTOP-.... 但我没有从 获取任何状态 $
..
我正在学习/试验Flink,我观察到数据流联接的一些意外行为,希望了解发生的情况. 假设我有两个流,每个流有10条记录,我希望在id字段上加入这两个流。让我们假设一个流中的每个记录在另一个流中都有一个匹配的记录,并且ID在每个流中都是唯一的。我们还假设我必须使用全局窗口(要求)。 使用数据流API连接(我在Scala中的简化代码): val stream1 = ... // from
..
我想对安全Kafka集群的Kafka主题支持的Flink SQL表执行查询。我能够以编程方式执行查询,但无法通过Flink SQL Client执行相同的操作。我不确定如何通过Flink SQL客户端传递JAAS配置(java.security.auth.login.config)和其他系统属性。 以编程方式刷新SQL查询 private static void simpleExec
..
大家好,我们有一个使用Flink运算符获得的蓝绿色部署的Flink应用程序。 Flinkk8soperator用于Apache Flink。运营商部署后启动以下三个K8服务: my-flinkapp-14hdhsr (Top level service) my-flinkapp-green my-flinkapp-blue 设想是蓝绿色中的两个中的一个将处于活动状态,并且将具有Po
..
我正在尝试实现一个pyflink作业(通过Table API),在来自多个源的数据转换为标准格式后,该作业从多个源执行一些基本处理。我可以将每个源中的数据转换为所需的格式(具有指定列的“表架构”),但不能将该数据设置为“处理表”以供进一步实现。这就是我要做的: 若要将不同的源架构转换为标准的处理表方案,然后可以对其进行窗口化,请实现表函数(针对多个输入行)等。 编辑: 感谢您建议
..
在Flink 1.11DataStream API page上,有一个使用keyBy()的WindowWordCount程序,但此方法已弃用,我找不到任何不使用keyBy()如何重写它的示例。如有任何建议,我们将不胜感激。 我使用IntelliJ;它警告keyBy()已弃用。这是我能找到的唯一link。 推荐答案 接受整数或字符串参数的keyBy形式已弃用。您应该使用接受KeyS
..
我刚刚尝试使用Rosetta 2兼容层在采用新M1处理器的Apple Mac Pro上运行basic example for Apache Flink。 遗憾的是,它失败,出现以下堆栈跟踪: flink-1.12.2 ./bin/flink run ./examples/streaming/WordCount.jar Executing WordCount example with d
..
我使用Flink SQL来计算基于事件时间的窗口化分析。一切都很正常,直到我的数据源每天晚上变得空闲,之后直到第二天数据再次开始流动时才会产生最后一分钟的结果。 CREATE TABLE input id STRING, data BIGINT, rowtime TIMESTAMP(3) METADATA FROM 'timestamp', WATERMARK
..
您好,我正在尝试从一个卡夫卡主题中读取数据,并在进行一些处理后写入到另一个主题中。 当我试图将数据写入另一个主题时,我能够读取数据并对其进行处理。它会显示错误 如果我尝试按原样写入数据,而不对其进行任何处理。Kafka生产者SimpleStringSchema接受它。 但我想将字符串转换为Json。玩Json,然后以字符串格式将其写入另一个主题。 我的代码: import jso
..
我们使用Flink从一些物联网传感器生成事件。每个传感器都可用于生成不同类型的事件(如温度、湿度等)。一对多比率(传感器启用的事件)。 传感器与存储在关系数据库中的启用事件之间的映射 为了丰富传感器数据,我们将连接传感器数据流和表API。正在添加具有已启用事件列表的元数据。 那么,如果某些特定的sensor-123只启用了TEMP和PRESSURE两个事件,如何才能只向这两个定义的流
..