dataflow相关内容

数据流BigQuery-BigQuery管道对较小的数据执行,但不对大型生产数据集执行

这里的数据流有点新手,但已经成功地创建了一个工作良好的管道。 管道从BigQuery读取查询,应用Pardo(NLP函数),然后将数据写入新的BigQuery表。 我尝试处理的数据集大约为500 GB,包含46M条记录。 当我使用相同数据的子集(大约300k记录)尝试此操作时,它工作得很好,而且速度很快,请参见下面的内容: 当我尝试使用完整的数据集运行它时,它启动得非常快, ..

一个数据流作业内的并行管道

我要在GCP上的一个数据流作业内运行两个并行管道。我已经创建了一个管道,并且工作正常,但我希望在不创建另一个作业的情况下创建另一个管道。 我搜索了这么多答案,但没有找到任何代码示例:( 如果我这样运行它,它不工作: pipe1.run(); pipe2.run(); 显示“已有活动作业名称...如果要提交第二个作业,请尝试使用--jobName重新设置其他名称” 推荐答 ..

如何将内联数据集(增量)的参数化链接服务的参数传递到数据流?

我在数据流中有一个增量数据源。为了连接到它,我需要使用参数化的链接服务;但是,我找不到哪里可以为链接的服务参数值寻址: 参数在下面的屏幕截图中突出显示: 在数据流中,我看不到任何提示来解决我的参数值: 谢谢:) 推荐答案 当您创建Delta Lake链接服务时,它会显示为您没有提供参数! 以下是我认为有意义的:Delta格式仅作为内联数据集可用。因为Delta La ..
发布时间:2022-04-04 18:23:48 其他开发

ADF映射数据流,是否可以在源上执行SQL?

因此,我继续将我可爱的SSIS包重写为ADF数据流。然而,在很多情况下,我有一些OLE DB源,其中包含相当复杂的SQL语句,然后是其他转换。 假设有一条SQL语句连接了10个不同的表。据我所知,我只能在我的接收器上执行SQL语句。因此,为了获得稍后使用的完全相同的数据集,我必须创建10个不同的源和10个联接操作。对吗? 这是可能的,但似乎效率不是很高。我唯一想到的另一件事就是重新思考 ..
发布时间:2022-04-04 17:22:42 其他开发

在 Python 中具有依赖关系的惰性数据流(类似电子表格)属性

我的问题如下:我有一些 python 类具有从其他属性派生的属性;并且这些应该在计算后被缓存,并且每次更改基本属性时都应该使缓存的结果无效. 我可以手动完成,但如果属性数量增加,似乎很难维护.所以我想在我的对象中加入类似 Makefile 的规则,以自动跟踪需要重新计算的内容. 所需的语法和行为应该是这样的: # 这会变魔术,比如生成反向依赖图,# 并准备使缓存值无效的设置器@dat ..
发布时间:2022-01-15 22:57:45 Python

在数据流中使用 `fromTable` 和 `fromQuery("SELECT * ...")` 时,`BigQueryIO` 是否有区别?

当您需要在数据流作业中从 bigquery 的一个或多个表中读取所有数据时,我会说有两种方法.第一种方法是将 BigQueryIO 与 from 结合使用,后者读取有问题的表,第二种方法是使用 fromQuery 在其中指定一个从同一个表中读取所有数据的查询.所以我的问题是: 使用其中一种是否有任何成本或性能优势? 我在文档中没有找到任何关于此的内容,但我真的很想知道.我想也许 rea ..
发布时间:2021-12-30 23:14:10 其他开发

避免使用 ActionBlock.Post 当 PostDataflowBlockOptions.BoundedCapacity 不是默认值时?

我听说如果使用 Post 方法而不是 ActionBlock 的 SendAsync 方法,可能会丢失信息对象,当您决定使用它的 BoundedCapacity 属性时. 谁能解释一下为什么会这样? 解决方案 Post 方法尝试同步发布项目并返回 true 或 false,具体取决于关于区块是否接受该项目.不接受项目的原因: 块被标记为完成(通过调用它的 Complete 方法 ..
发布时间:2021-11-24 19:24:50 C#/.NET

避免使用 ActionBlock.Post 当 PostDataflowBlockOptions.BoundedCapacity 不是默认值时?

我听说如果使用 Post 方法而不是 ActionBlock 的 SendAsync 方法,可能会丢失信息对象,当您决定使用它的 BoundedCapacity 属性时. 谁能解释一下为什么会这样? 解决方案 Post 方法尝试同步发布项目并返回 true 或 false,具体取决于关于区块是否接受该项目.不接受项目的原因: 块被标记为完成(通过调用它的 Complete 方法 ..
发布时间:2021-11-24 18:59:04 C#/.NET

javac数据流分析的奇怪误报

我有以下形式的代码: class 测试 {私人最终 A t;公共测试(){为了 ( ... : ... ) {最终 A u = 空;}t = 新 A();}私有类 A {}} 编译器说: 变量 t 可能已经被赋值 有趣的是,如果我对循环执行以下任何更改,它就会起作用! 将循环的内容更改为A u = null 移除循环(但保持final A u = null;) 用经典的计数循环替 ..
发布时间:2021-11-22 16:43:30 Java开发

您将如何显示/布局企业应用程序之间的数据流?

我的雇主是一家大型瑞士电信公司.我们有许多系统用于为不同的任务传输数据,例如性能管理、故障管理、配置管理等 为了向“管理"(尖头发和其他)解释这些系统如何交互,我将有关数据流/格式/协议的信息收集到“数据库"(逗号分隔的说服)中,然后为 Graphviz 生成代码(http://www.graphviz.org/) 和 Yed (http://www.yworks.com/en/produc ..
发布时间:2021-11-17 02:59:55 其他开发

如何在Nifi的虚拟环境中运行具有依赖项的python脚本?

Nifi 有没有办法运行 Python 脚本,该脚本具有从不同文件夹导入的模块、pipfile 中指定的要求以及要传递的参数? 简而言之,如何使用Nifi执行通常在我的虚拟环境中运行的python脚本? 我的最终目标是使用 Get File 获取文件并将其发布到 API.我试过执行进程,执行流命令处理器. 解决方案 要使用 Python 对流文件进行后续处理,您可以使用 Exe ..
发布时间:2021-11-12 03:59:57 其他开发

如何使用 ExecuteScript(以 python 作为脚本引擎)进行添加数字的练习?【新手用户尝试学习NiFi】

我对 NiFi 比较陌生,不确定如何正确执行以下操作.我想使用 ExecuteScript 处理器(脚本引擎:python)来执行以下操作(请仅在 python 中): 1) 有一个包含以下信息的 CSV 文件(第一行是标题): 第一,第二,第三1,4,97,5,23,8,7 2) 我想找到单个行的总和并生成一个带有修改标题的最终文件.最终文件应如下所示: 第一、第二、第三、总1、4、 ..
发布时间:2021-11-12 03:51:53 其他开发

如何应对(Apache Beam)高IO瓶颈?

让我们举一个简单的例子,我有一个非常简单的光束管道,它只是从文件中读取数据并将数据转储到输出文件中.现在让我们考虑输入文件是巨大的(一些 GB 大小,您通常无法在文本编辑器中打开的文件类型).由于直接运行器的实现非常简单(它将整个输入集读取到内存中),它将无法读取和输出这些大文件(除非您为 java vm 进程分配了不切实际的大量内存);所以我的问题是:“像 flink/spark/cloud 数 ..

groupwin 的概念就像未对齐的窗口?

groupwin 我用esper的意思: 此视图根据指定表达式返回的值或表达式列表返回的值组合将事件分组为子视图. 我认为是你有能力按组操作,而不是流(group by 用于控制聚合的分组方式.) 未对齐的窗口 在谷歌的数据流中,未对齐的窗口意味着: 未对齐的窗口是指不跨越整个数据源的窗口,而只是它的一个子集,例如每个用户的窗口. 这些是同一个意思吗?设置 ..