需要从 apache 梁(数据流)在 clickhouseIO 中插入行 [英] Need to insert rows in clickhouseIO from apache beam(dataflow)

查看:26
本文介绍了需要从 apache 梁(数据流)在 clickhouseIO 中插入行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在阅读一个运行良好的 Pub/Sub 主题,现在我需要在 clickHouse 上插入一个表格.

我正在学习,请原谅我的迟到.

<预><代码>PipelineOptions options = PipelineOptionsFactory.create();//PubSubToDatabasesPipelineOptions 选项;管道 p = Pipeline.create(options);PCollectioninputFromPubSub = p.apply(namePrefix + "ReadFromPubSub",PubsubIO.readStrings().fromSubscription("projects/*********/subscriptions/crypto_bitcoin.dataflow.bigquery.transactions").withIdAttribute(PUBSUB_ID_ATTRIBUTE));PCollection<TransactionSmall>res = inputFromPubSub.apply(namePrefix + "ReadFromPubSub", ParDo.of(new DoFn() {@ProcessElementpublic void processElement(ProcessContext c) {String item = c.element();//System.out.print(item);Transaction transaction = JsonUtils.parseJson(item, Transaction.class);//System.out.print(transaction);c.output(new TransactionSmall(new Date(),transaction.getHash(), 123));}}));res.apply(ClickHouseIO.<TransactionSmall>write("jdbc:clickhouse://**.**.**.**:8123/litecoin?password=*****", "****"));p.run().waitUntilFinish();

我的 TransactionSmall.java

import java.io.Serializable;导入 java.util.Date;公共类 TransactionSmall 实现了 Serializable {私人日期 created_dt;私有字符串哈希;私有整数;public TransactionSmall(Date created_dt, String hash, int number) {this.created_dt = created_dt;this.hash = 哈希;this.number = 数字;}}

我的表定义

clickhouse.us-east1-bcstaging-btc-etl.internal :) CREATE TABLE litecoin.saurabh_blocks_small (`created_date` Date DEFAULT today(), `hash` String, `number` In) ENGINE = MergeTree(created_date, (hash, number), 8192)创建表 litecoin.saurabh_blocks_small(`created_date` 日期,`hash` 字符串,`数字`输入)ENGINE = MergeTree(created_date, (hash, number), 8192)

我收到类似的错误

java.lang.IllegalArgumentException:@Element 的类型必须匹配 DoFn 类型aurabhReadFromPubSub2/ParMultiDo(Anonymous).output [PCollection]在 org.apache.beam.sdk.transforms.ParDo.getDoFnSchemaInformation (ParDo.java:577)在 org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.translateParDo (ParDoTranslation.java:185)在 org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation$ParDoTranslator.translate (ParDoTranslation.java:124)在 org.apache.beam.repackaged.direct_java.runners.core.construction.PTransformTranslation.toProto (PTransformTranslation.java:155)在 org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.getParDoPayload (ParDoTranslation.java:650)在 org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.isSplittable (ParDoTranslation.java:665)在 org.apache.beam.repackaged.direct_java.runners.core.construction.PTransformMatchers$6.matches (PTransformMatchers.java:269)在 org.apache.beam.sdk.Pipeline$2.visitPrimitiveTransform (Pipeline.java:282)在 org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:665)在 org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:657)在 org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:657)在 org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:657)在 org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600 (TransformHierarchy.java:317)在 org.apache.beam.sdk.runners.TransformHierarchy.visit (TransformHierarchy.java:251)在 org.apache.beam.sdk.Pipeline.traverseTopologically (Pipeline.java:460)在 org.apache.beam.sdk.Pipeline.replace (Pipeline.java:260)在 org.apache.beam.sdk.Pipeline.replaceAll (Pipeline.java:210)在 org.apache.beam.runners.direct.DirectRunner.run (DirectRunner.java:170)在 org.apache.beam.runners.direct.DirectRunner.run (DirectRunner.java:67)在 org.apache.beam.sdk.Pipeline.run (Pipeline.java:315)在 org.apache.beam.sdk.Pipeline.run (Pipeline.java:301)在 io.blockchainetl.bitcoin.Trail.main (Trail.java:74)在 sun.reflect.NativeMethodAccessorImpl.invoke0(本机方法)在 sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)在 sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)在 java.lang.reflect.Method.invoke (Method.java:498)在 org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:282)在 java.lang.Thread.run (Thread.java:748)

在不显式创建对象的情况下实现这一目标的最佳方法和最简洁的方法是什么?

谢谢

解决方案

这很可能发生,因为 Beam 在为其推断架构时依赖于 PCollection 的编码器规范.推断 ClickhouseIO 转换的输入架构似乎有问题.

您可以通过指定具有模式推断功能的编码器(例如 AvroCoder)来强制 Beam 具有模式.你会这样做:

@DefaultCoder(AvroCoder.class)公共类 TransactionSmall 实现了 Serializable {私人日期 created_dt;私有字符串哈希;私有整数;public TransactionSmall(Date created_dt, String hash, int number) {this.created_dt = created_dt;this.hash = 哈希;this.number = 数字;}}

或者您也可以为管道上的 PCollection 设置编码器:

PCollectionres = inputFromPubSub.apply(namePrefix + "ReadFromPubSub", ParDo.of(new DoFn() {@ProcessElementpublic void processElement(ProcessContext c) {String item = c.element();Transaction transaction = JsonUtils.parseJson(item, Transaction.class);c.output(new TransactionSmall(new Date(),transaction.getHash(), 123));}})).setCoder(AvroCoder.of(TransactionSmall.class));res.apply(ClickHouseIO.<TransactionSmall>write("jdbc:clickhouse://**.**.**.**:8123/litecoin?password=*****", "****"));

I am reading from a Pub/Sub topic which running fine now I need to insert into a Table on clickHouse.

I am learning please excuse the tardiness.


        PipelineOptions options = PipelineOptionsFactory.create();


        //PubSubToDatabasesPipelineOptions options;
        Pipeline p = Pipeline.create(options);

        PCollection<String> inputFromPubSub = p.apply(namePrefix + "ReadFromPubSub",
                PubsubIO.readStrings().fromSubscription("projects/*********/subscriptions/crypto_bitcoin.dataflow.bigquery.transactions").withIdAttribute(PUBSUB_ID_ATTRIBUTE));



        PCollection<TransactionSmall> res = inputFromPubSub.apply(namePrefix + "ReadFromPubSub", ParDo.of(new DoFn<String, TransactionSmall>() {
            @ProcessElement
            public void processElement(ProcessContext c) {
                String item = c.element();
                //System.out.print(item);
                Transaction transaction = JsonUtils.parseJson(item, Transaction.class);
                //System.out.print(transaction);
                c.output(new TransactionSmall(new Date(),transaction.getHash(), 123));
            }}));


        res.apply(ClickHouseIO.<TransactionSmall>write("jdbc:clickhouse://**.**.**.**:8123/litecoin?password=*****", "****"));

        p.run().waitUntilFinish();

My TransactionSmall.java

import java.io.Serializable;
import java.util.Date;

public class TransactionSmall implements Serializable {

    private Date created_dt;
    private String hash;

    private int number;

    public TransactionSmall(Date created_dt, String hash, int number) {
        this.created_dt = created_dt;
        this.hash = hash;
        this.number = number;
    }
}

My table definition

clickhouse.us-east1-b.c.staging-btc-etl.internal :) CREATE TABLE litecoin.saurabh_blocks_small (`created_date` Date DEFAULT today(), `hash` String, `number` In) ENGINE = MergeTree(created_date, (hash, number), 8192)

CREATE TABLE litecoin.saurabh_blocks_small
(
    `created_date` Date, 
    `hash` String, 
    `number` In
)
ENGINE = MergeTree(created_date, (hash, number), 8192)

I am getting error like

java.lang.IllegalArgumentException: Type of @Element must match the DoFn typesaurabhReadFromPubSub2/ParMultiDo(Anonymous).output [PCollection]
    at org.apache.beam.sdk.transforms.ParDo.getDoFnSchemaInformation (ParDo.java:577)
    at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.translateParDo (ParDoTranslation.java:185)
    at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation$ParDoTranslator.translate (ParDoTranslation.java:124)
    at org.apache.beam.repackaged.direct_java.runners.core.construction.PTransformTranslation.toProto (PTransformTranslation.java:155)
    at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.getParDoPayload (ParDoTranslation.java:650)
    at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.isSplittable (ParDoTranslation.java:665)
    at org.apache.beam.repackaged.direct_java.runners.core.construction.PTransformMatchers$6.matches (PTransformMatchers.java:269)
    at org.apache.beam.sdk.Pipeline$2.visitPrimitiveTransform (Pipeline.java:282)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:665)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:657)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:657)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:657)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600 (TransformHierarchy.java:317)
    at org.apache.beam.sdk.runners.TransformHierarchy.visit (TransformHierarchy.java:251)
    at org.apache.beam.sdk.Pipeline.traverseTopologically (Pipeline.java:460)
    at org.apache.beam.sdk.Pipeline.replace (Pipeline.java:260)
    at org.apache.beam.sdk.Pipeline.replaceAll (Pipeline.java:210)
    at org.apache.beam.runners.direct.DirectRunner.run (DirectRunner.java:170)
    at org.apache.beam.runners.direct.DirectRunner.run (DirectRunner.java:67)
    at org.apache.beam.sdk.Pipeline.run (Pipeline.java:315)
    at org.apache.beam.sdk.Pipeline.run (Pipeline.java:301)
    at io.blockchainetl.bitcoin.Trail.main (Trail.java:74)
    at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke (Method.java:498)
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:282)
    at java.lang.Thread.run (Thread.java:748)


what would be the best way and cleanest way to achieve this without explicitly creating objects?

Thanks

解决方案

This is likely happening because Beam relies on the coder specification for a PCollection when it infers the schema for it. It seems to be having trouble inferring the input schema for your ClickhouseIO transform.

You can compel Beam to have a schema by specifying a coder with schema inference, such as AvroCoder. You'd do:

@DefaultCoder(AvroCoder.class)
public class TransactionSmall implements Serializable {

    private Date created_dt;
    private String hash;

    private int number;

    public TransactionSmall(Date created_dt, String hash, int number) {
        this.created_dt = created_dt;
        this.hash = hash;
        this.number = number;
    }
}

Or you can also set the coder for the PCollection on your pipeline:

PCollection<TransactionSmall> res = inputFromPubSub.apply(namePrefix + "ReadFromPubSub", ParDo.of(new DoFn<String, TransactionSmall>() {
    @ProcessElement
    public void processElement(ProcessContext c) {
        String item = c.element();
        Transaction transaction = JsonUtils.parseJson(item, Transaction.class);
        c.output(new TransactionSmall(new Date(),transaction.getHash(), 123));
     }}))
    .setCoder(AvroCoder.of(TransactionSmall.class));


res.apply(ClickHouseIO.<TransactionSmall>write("jdbc:clickhouse://**.**.**.**:8123/litecoin?password=*****", "****"));

这篇关于需要从 apache 梁(数据流)在 clickhouseIO 中插入行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆