Solid的核心计算多次返回输出 [英] Core compute for solid returned an output multiple times

查看:55
本文介绍了Solid的核心计算多次返回输出的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是Dagster的新手,在文档中找不到我的问题的答案.

I am very new to Dagster and I can't find answer to my question in the docs.

我有2个实体:一个实体产生从XML文件解析的元组(str,str),他另一个仅消耗元组并将对象存储在具有相应字段集的DB中.但是我遇到了一个错误 solid的核心计算多次返回输出.我很确定自己在设计中犯了根本性的错误.有人可以向我解释如何以正确的方式设计此管道,或者指向文档中解释此错误的章节吗?

I have 2 solids: one thats yielding tuples(str, str) that are parsed from XML file, he other one just consumes tuples and stores objects in DB with according fields set. However I am running into an error Core compute for solid returned an output multiple times. I am pretty sure I made fundamental mistake in my design. Could someone explain to me how to design this pipeline in the right way or point me to the chapter in the docs that explains this error?


@solid(output_defs=[OutputDefinition(Tuple, 'classification_data')])
def extract_classification_from_file(context, xml_path: String) -> Tuple:
    context.log.info(f"start")
    root = ET.parse(xml_path).getroot()
    for code_node in root.findall('definition-item'):
        context.log.info(f"{code_node.find('classification-symbol').text} {code_node.find('definition-title').text}")
        yield Output((code_node.find('classification-symbol').text, code_node.find('definition-title').text), 'classification_data')


@solid()
def load_classification(context, classification_data):
    cls = CPCClassification.objects.create(code=classification_data[0], description=classification_data[1]).save()

@pipeline
def define_classification_pipeline():
    load_classification(extract_classification_from_file())

推荐答案

在查看了dagster代码库中的错误之后,我发现了中阅读的内容教程输出名称必须唯一".

After looking at the dagster codebase for your error, which I found here. It confirmed what I read in the tutorial that "Output names must be unique".

鉴于您是在for循环中声明Output以及收到的错误,则您的Output对象名称可能不是唯一的.

Given that you are declaring Output in a for-loop and the error you have received, it's likely that your Output object name is not unique.

更新:通过打开一个问题,从您对达克斯的宣传中,我测试了这个想法在运行时动态创建输出的说明,如果您在 @solid 之外定义动态代码,则效果很好.我确实发现,在尝试在 @solid 中构建动态数据时,其意图是将其输出用作对后继 @solid 的可靠配置输入,而后继 @solid 没有选择更新的结构.结果是我收到一个 dagster.core.errors.DagsterInvariantViolationError

UPDATE: From the outreach you made to dagster by opening an issue, I tested the idea of creating Outputs dynamically at run-time and it works fine if you define your dynamic code outside of a @solid. I did find that when attempting to build my dynamic data within a @solid with the intention of using its output as solid configuration input to a successor @solid that the successor @solid didn't pick up the updated structure. The result was me receiving an dagster.core.errors.DagsterInvariantViolationError

下面是我的代码,用于在实体之外执行动态数据生成时在运行时验证动态输出产量.我猜这可能有点反模式,但如果Dagster尚未达到解决您提出的情况的成熟度,那么可能还不太合适.还要注意,我没有处理的事情是对所有产生的Output对象做一些事情.

Below is my code to validate dynamic Output yielding at runtime when executing the dynamic-data generation outside of a solid. I'm guessing this might be a bit of an anti-pattern, but maybe not quite yet if Dagster isn't at the maturity level quite yet to handle the scenario you bring up. Also note that something I didn't handle is doing something with all of the yielded Output objects.

"""dagit -f dynamic_output_at_runtime.py -n dynamic_output_at_runtime"""
import random

from dagster import (
    Output,
    OutputDefinition,
    execute_pipeline,
    pipeline,
    solid,
    SystemComputeExecutionContext
)

# Create some dynamic OutputDefinition list for each execution
start = 1
stop = 100
limit = random.randint(1, 10)
random_set_of_ints = {random.randint(start, stop) for iter in range(limit)}
output_defs_runtime = [OutputDefinition(
    name=f'output_{num}') for num in random_set_of_ints]


@solid(output_defs=output_defs_runtime)
def ints_for_all(context: SystemComputeExecutionContext):
    for num in random_set_of_ints:
        out_name = f"output_{num}"
        context.log.info(f"output object name: {out_name}")
        yield Output(num, out_name)

@pipeline
def dynamic_output_at_runtime():
    x = ints_for_all()
    print(x)

if __name__ == '__main__':
    result = execute_pipeline(dynamic_output_at_runtime)
    assert result.success

我重新运行此管道的结果是每次输出的产出不同:

The result of me re-running this pipeline are different Output yields each time:

python dynamic_output_at_runtime.py 
_ints_for_all_outputs(output_56=<dagster.core.definitions.composition.InvokedSolidOutputHandle object at 0x7fb899cea160>, output_8=<dagster.core.definitions.composition.InvokedSolidOutputHandle object at 0x7fb899cea198>, output_58=<dagster.core.definitions.composition.InvokedSolidOutputHandle object at 0x7fb899cea1d0>, output_35=<dagster.core.definitions.composition.InvokedSolidOutputHandle object at 0x7fb899cea208>)
2019-11-27 08:33:32 - dagster - DEBUG - dynamic_output_at_runtime - a1273816-16b0-439b-ae32-dbd819f65b9a - PIPELINE_START - Started execution of pipeline "dynamic_output_at_runtime".
2019-11-27 08:33:32 - dagster - DEBUG - dynamic_output_at_runtime - a1273816-16b0-439b-ae32-dbd819f65b9a - ENGINE_EVENT - Executing steps in process (pid: 9456)
 event_specific_data = {"metadata_entries": [["pid", null, ["9456"]], ["step_keys", null, ["{'ints_for_all.compute'}"]]]}
2019-11-27 08:33:32 - dagster - DEBUG - dynamic_output_at_runtime - a1273816-16b0-439b-ae32-dbd819f65b9a - STEP_START - Started execution of step "ints_for_all.compute".
               solid = "ints_for_all"
    solid_definition = "ints_for_all"
            step_key = "ints_for_all.compute"
2019-11-27 08:33:32 - dagster - INFO - system - a1273816-16b0-439b-ae32-dbd819f65b9a - output object name: output_56
               solid = "ints_for_all"
    solid_definition = "ints_for_all"
            step_key = "ints_for_all.compute"
2019-11-27 08:33:32 - dagster - DEBUG - dynamic_output_at_runtime - a1273816-16b0-439b-ae32-dbd819f65b9a - STEP_OUTPUT - Yielded output "output_56" of type "Any". (Type check passed).
 event_specific_data = {"intermediate_materialization": null, "step_output_handle": ["ints_for_all.compute", "output_56"], "type_check_data": [true, "output_56", null, []]}
               solid = "ints_for_all"
    solid_definition = "ints_for_all"
            step_key = "ints_for_all.compute"
2019-11-27 08:33:32 - dagster - INFO - system - a1273816-16b0-439b-ae32-dbd819f65b9a - output object name: output_8
               solid = "ints_for_all"
    solid_definition = "ints_for_all"
            step_key = "ints_for_all.compute"
2019-11-27 08:33:32 - dagster - DEBUG - dynamic_output_at_runtime - a1273816-16b0-439b-ae32-dbd819f65b9a - STEP_OUTPUT - Yielded output "output_8" of type "Any". (Type check passed).
 event_specific_data = {"intermediate_materialization": null, "step_output_handle": ["ints_for_all.compute", "output_8"], "type_check_data": [true, "output_8", null, []]}
               solid = "ints_for_all"
    solid_definition = "ints_for_all"
            step_key = "ints_for_all.compute"
2019-11-27 08:33:32 - dagster - INFO - system - a1273816-16b0-439b-ae32-dbd819f65b9a - output object name: output_58
               solid = "ints_for_all"
    solid_definition = "ints_for_all"
            step_key = "ints_for_all.compute"
2019-11-27 08:33:32 - dagster - DEBUG - dynamic_output_at_runtime - a1273816-16b0-439b-ae32-dbd819f65b9a - STEP_OUTPUT - Yielded output "output_58" of type "Any". (Type check passed).
 event_specific_data = {"intermediate_materialization": null, "step_output_handle": ["ints_for_all.compute", "output_58"], "type_check_data": [true, "output_58", null, []]}
               solid = "ints_for_all"
    solid_definition = "ints_for_all"
            step_key = "ints_for_all.compute"
2019-11-27 08:33:32 - dagster - INFO - system - a1273816-16b0-439b-ae32-dbd819f65b9a - output object name: output_35
               solid = "ints_for_all"
    solid_definition = "ints_for_all"
            step_key = "ints_for_all.compute"
2019-11-27 08:33:32 - dagster - DEBUG - dynamic_output_at_runtime - a1273816-16b0-439b-ae32-dbd819f65b9a - STEP_OUTPUT - Yielded output "output_35" of type "Any". (Type check passed).
 event_specific_data = {"intermediate_materialization": null, "step_output_handle": ["ints_for_all.compute", "output_35"], "type_check_data": [true, "output_35", null, []]}
               solid = "ints_for_all"
    solid_definition = "ints_for_all"
            step_key = "ints_for_all.compute"
2019-11-27 08:33:32 - dagster - DEBUG - dynamic_output_at_runtime - a1273816-16b0-439b-ae32-dbd819f65b9a - STEP_SUCCESS - Finished execution of step "ints_for_all.compute" in 2.17ms.
 event_specific_data = {"duration_ms": 2.166192003642209}
               solid = "ints_for_all"
    solid_definition = "ints_for_all"
            step_key = "ints_for_all.compute"
2019-11-27 08:33:32 - dagster - DEBUG - dynamic_output_at_runtime - a1273816-16b0-439b-ae32-dbd819f65b9a - ENGINE_EVENT - Finished steps in process (pid: 9456) in 3.11ms
 event_specific_data = {"metadata_entries": [["pid", null, ["9456"]], ["step_keys", null, ["{'ints_for_all.compute'}"]]]}
2019-11-27 08:33:32 - dagster - DEBUG - dynamic_output_at_runtime - a1273816-16b0-439b-ae32-dbd819f65b9a - PIPELINE_SUCCESS - Finished execution of pipeline "dynamic_output_at_runtime".

我希望这会有所帮助!

这篇关于Solid的核心计算多次返回输出的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆