需要从Apache Beam(数据流)的clickhouseIO中插入行 [英] Need to insert rows in clickhouseIO from apache beam(dataflow)

查看:97
本文介绍了需要从Apache Beam(数据流)的clickhouseIO中插入行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在阅读一个Pub/Sub主题,该主题运行良好,现在我需要将其插入clickHouse的表中.

I am reading from a Pub/Sub topic which running fine now I need to insert into a Table on clickHouse.

我正在学习,请原谅.

I am learning please excuse the tardiness.


        PipelineOptions options = PipelineOptionsFactory.create();


        //PubSubToDatabasesPipelineOptions options;
        Pipeline p = Pipeline.create(options);

        PCollection<String> inputFromPubSub = p.apply(namePrefix + "ReadFromPubSub",
                PubsubIO.readStrings().fromSubscription("projects/*********/subscriptions/crypto_bitcoin.dataflow.bigquery.transactions").withIdAttribute(PUBSUB_ID_ATTRIBUTE));



        PCollection<TransactionSmall> res = inputFromPubSub.apply(namePrefix + "ReadFromPubSub", ParDo.of(new DoFn<String, TransactionSmall>() {
            @ProcessElement
            public void processElement(ProcessContext c) {
                String item = c.element();
                //System.out.print(item);
                Transaction transaction = JsonUtils.parseJson(item, Transaction.class);
                //System.out.print(transaction);
                c.output(new TransactionSmall(new Date(),transaction.getHash(), 123));
            }}));


        res.apply(ClickHouseIO.<TransactionSmall>write("jdbc:clickhouse://**.**.**.**:8123/litecoin?password=*****", "****"));

        p.run().waitUntilFinish();

我的TransactionSmall.java

My TransactionSmall.java

import java.io.Serializable;
import java.util.Date;

public class TransactionSmall implements Serializable {

    private Date created_dt;
    private String hash;

    private int number;

    public TransactionSmall(Date created_dt, String hash, int number) {
        this.created_dt = created_dt;
        this.hash = hash;
        this.number = number;
    }
}

我的表定义

clickhouse.us-east1-b.c.staging-btc-etl.internal :) CREATE TABLE litecoin.saurabh_blocks_small (`created_date` Date DEFAULT today(), `hash` String, `number` In) ENGINE = MergeTree(created_date, (hash, number), 8192)

CREATE TABLE litecoin.saurabh_blocks_small
(
    `created_date` Date, 
    `hash` String, 
    `number` In
)
ENGINE = MergeTree(created_date, (hash, number), 8192)

我遇到类似的错误

java.lang.IllegalArgumentException: Type of @Element must match the DoFn typesaurabhReadFromPubSub2/ParMultiDo(Anonymous).output [PCollection]
    at org.apache.beam.sdk.transforms.ParDo.getDoFnSchemaInformation (ParDo.java:577)
    at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.translateParDo (ParDoTranslation.java:185)
    at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation$ParDoTranslator.translate (ParDoTranslation.java:124)
    at org.apache.beam.repackaged.direct_java.runners.core.construction.PTransformTranslation.toProto (PTransformTranslation.java:155)
    at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.getParDoPayload (ParDoTranslation.java:650)
    at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.isSplittable (ParDoTranslation.java:665)
    at org.apache.beam.repackaged.direct_java.runners.core.construction.PTransformMatchers$6.matches (PTransformMatchers.java:269)
    at org.apache.beam.sdk.Pipeline$2.visitPrimitiveTransform (Pipeline.java:282)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:665)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:657)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:657)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:657)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600 (TransformHierarchy.java:317)
    at org.apache.beam.sdk.runners.TransformHierarchy.visit (TransformHierarchy.java:251)
    at org.apache.beam.sdk.Pipeline.traverseTopologically (Pipeline.java:460)
    at org.apache.beam.sdk.Pipeline.replace (Pipeline.java:260)
    at org.apache.beam.sdk.Pipeline.replaceAll (Pipeline.java:210)
    at org.apache.beam.runners.direct.DirectRunner.run (DirectRunner.java:170)
    at org.apache.beam.runners.direct.DirectRunner.run (DirectRunner.java:67)
    at org.apache.beam.sdk.Pipeline.run (Pipeline.java:315)
    at org.apache.beam.sdk.Pipeline.run (Pipeline.java:301)
    at io.blockchainetl.bitcoin.Trail.main (Trail.java:74)
    at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke (Method.java:498)
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:282)
    at java.lang.Thread.run (Thread.java:748)


什么是在没有显式创建对象的情况下实现此目标的最佳方法和最干净的方法?

what would be the best way and cleanest way to achieve this without explicitly creating objects?

谢谢

推荐答案

这很可能发生,因为Beam推断PCollection的架构时依赖于编码器规范.似乎无法为ClickhouseIO转换推断输入模式.

This is likely happening because Beam relies on the coder specification for a PCollection when it infers the schema for it. It seems to be having trouble inferring the input schema for your ClickhouseIO transform.

您可以通过指定具有模式推断的编码器(例如AvroCoder)来强迫Beam具有模式.您会这​​样做:

You can compel Beam to have a schema by specifying a coder with schema inference, such as AvroCoder. You'd do:

@DefaultCoder(AvroCoder.class)
public class TransactionSmall implements Serializable {

    private Date created_dt;
    private String hash;

    private int number;

    public TransactionSmall(Date created_dt, String hash, int number) {
        this.created_dt = created_dt;
        this.hash = hash;
        this.number = number;
    }
}

或者您也可以在管道上为PCollection设置编码器:

Or you can also set the coder for the PCollection on your pipeline:

PCollection<TransactionSmall> res = inputFromPubSub.apply(namePrefix + "ReadFromPubSub", ParDo.of(new DoFn<String, TransactionSmall>() {
    @ProcessElement
    public void processElement(ProcessContext c) {
        String item = c.element();
        Transaction transaction = JsonUtils.parseJson(item, Transaction.class);
        c.output(new TransactionSmall(new Date(),transaction.getHash(), 123));
     }}))
    .setCoder(AvroCoder.of(TransactionSmall.class));


res.apply(ClickHouseIO.<TransactionSmall>write("jdbc:clickhouse://**.**.**.**:8123/litecoin?password=*****", "****"));

这篇关于需要从Apache Beam(数据流)的clickhouseIO中插入行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆