Flink 不发出存储在 Cassandra 中的值 [英] Flink Not Emitting Value to Store in Cassandra

查看:24
本文介绍了Flink 不发出存储在 Cassandra 中的值的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有以下 POJO 课程,

I have following POJO class,

import com.datastax.driver.mapping.annotations.Column;
import com.datastax.driver.mapping.annotations.Table;

@Table(keyspace = "testKey", name = "contact")
public class Person implements Serializable {

    private static final long serialVersionUID = 1L;

    @Column(name = "name")
    private String name;

    @Column(name = "timeStamp")
    private LocalDateTime timeStamp;
}

和 Mapper 代码是,

and Mapper code is,

DataStream<Reading> sideOutput = stream.flatMap(new FlatMapFunction<String, Person>() {
            @Override
            public void flatMap(String value, Collector<Person> out) throws Exception {
                try {
                    out.collect(objectMapper.readValue(value, Person.class));
                } catch (JsonProcessingException e) {
                    e.printStackTrace();
                }
            }
        }).getSideOutput(new OutputTag<>("contact", TypeInformation.of(Person.class)));

 env.execute();
 
 CassandraSink.addSink(sideOutput)
                .setHost("localhost")
                .setMapperOptions(() -> new Mapper.Option[]{Mapper.Option.saveNullFields(true)})
                .build();

如果没有 .getSideOutput(new OutputTag<>(contact", TypeInformation.of(Person.class))); 也行不通.

It's not working without .getSideOutput(new OutputTag<>("contact", TypeInformation.of(Person.class))); also.

sideOutput 不会发出要存储在 Cassandra 中的值.知道我哪里做错了吗?

The sideOutput is not emitting value to store in Cassandra. any idea where I am doing wrong?

推荐答案

我会说,env.execute(); 应该在管道构建之后调用,即在 CassandraSink 之后 并将摆脱侧面输出.有些人认为这样应该可行:

I would say, env.execute(); should be called after the pipeline is build, i.e. after the CassandraSink and would get rid of side output. Somethink like this should work:

DataStream<Reading> ds = stream.flatMap(new FlatMapFunction<String, Person>() {
            @Override
            public void flatMap(String value, Collector<Person> out) throws Exception {
                try {
                    out.collect(objectMapper.readValue(value, Person.class));
                } catch (JsonProcessingException e) {
                    e.printStackTrace();
                }
            }
        });
 
 CassandraSink.addSink(ds)
                .setHost("localhost")
                .setMapperOptions(() -> new Mapper.Option[]{Mapper.Option.saveNullFields(true)})
                .build();


 env.execute();

这篇关于Flink 不发出存储在 Cassandra 中的值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆