使用Apache Kafka Streaming解析JSON数据 [英] Parsing JSON data using Apache Kafka Streaming

查看:168
本文介绍了使用Apache Kafka Streaming解析JSON数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个场景可以从我的Kafka主题中读取JSON数据,并且通过使用Kafka 0.11版本,我需要编写Java代码以流式传输Kafka主题中存在的JSON数据.我的输入是一个包含数组的Json Data字典.

I had a scenario to read the JSON data from my Kafka topic, and by making use of Kafka 0.11 version I need to write Java code for streaming the JSON data present in the Kafka topic.My input is a Json Data containing arrays of Dictionaries.

现在我的要求是获取文本"字段,从json数据中键入包含在数组中的字典中的键,并将所有这些文本推文通过Kafka Streaming传递给另一个主题.

Now my requirement is to get the "text" field, key in dictionary contained in array from the json data and pass all those text tweets to another topic through Kafka Streaming.

直到这里我都写了代码. 请帮助我解析数据.

I wrote code till here. Please help me to parse the data.

final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);

KStreamBuilder builder = new KStreamBuilder();

KStream<String, JsonNode> personstwitter =builder.stream(Serdes.String(), jsonSerde, "Persons");//taking the json node as input


personstwitter.to(Serdes.String(), jsonSerde,"Persons-output");

推荐答案

我建议您以下对JSON数据进行更多控制.

I would suggest you the following to have more control on the JSON data.

  1. 写一个SerializerDe-Serializer.
  2. 基于JSON字符串创建POJO. POJO是对数据进行更多控制的最佳方法.
  3. 将数据映射到POJO以访问所需的数据.
  1. write a Serializer and De-Serializer.
  2. Create a POJO basing on the JSON String. POJO is the best way to have more control on the data.
  3. Map the data to POJO to access the required data.

POJO:

@JsonRootName("person")
public class Person implements Serializable {

    /**
     * 
     */
    private static final long serialVersionUID = 1L;
    private String name;
    private String personalID;
    private String country;
    private String occupation;

    public Person() {

    }

    @JsonCreator
    public Person(@JsonProperty("name") String name,@JsonProperty("personalID") String personalID,
            @JsonProperty("country") String country,@JsonProperty("occupation") String occupation){
        this.name= name;
        this.personalID = personalID;
        this.country = country;
        this.occupation = occupation;
    }

    //getters and setters stripped
}

序列化器:

public class JsonSerializer<T> implements Serializer<T> {

    private ObjectMapper om = new ObjectMapper();

    @Override
    public void close() {
        // TODO Auto-generated method stub

    }

    @Override
    public void configure(Map<String, ?> config, boolean isKey) {
        // TODO Auto-generated method stub

    }

    @Override
    public byte[] serialize(String topic, T data) {
        byte[] retval = null;
        try {
            System.out.println(data.getClass());
            retval = om.writeValueAsString(data).getBytes();
        } catch (JsonProcessingException e) {
            throw new SerializationException();
        }
        return retval;
    }

}

反序列化器:

public class JsonDeserializer<T> implements Deserializer<T> {

    private ObjectMapper om = new ObjectMapper();
    private Class<T> type;

    /*
     * Default constructor needed by kafka
     */
    public JsonDeserializer() {

    }

    public JsonDeserializer(Class<T> type) {
        this.type = type;
    }

    @Override
    public void close() {
        // TODO Auto-generated method stub

    }

    @SuppressWarnings("unchecked")
    @Override
    public void configure(Map<String, ?> map, boolean arg1) {
        if (type == null) {
            type = (Class<T>) map.get("type");
        }

    }

    @Override
    public T deserialize(String undefined, byte[] bytes) {
        T data = null;
        if (bytes == null || bytes.length == 0) {
            return null;
        }

        try {
            System.out.println(getType());
            data = om.readValue(bytes, type);
        } catch (Exception e) {
            throw new SerializationException(e);
        }

        return data;
    }

    protected Class<T> getType() {
        return type;
    }

}

消费者:

public class ConsumerUtilities {

    public static Properties getProperties() {

        Properties configs = new Properties();
        configs.put(StreamsConfig.APPLICATION_ID_CONFIG,
                "Kafka test application");
        configs.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        return configs;
    }

    public static KStreamBuilder getStreamingConsumer() {
        KStreamBuilder builder = new KStreamBuilder();
        return builder;
    }

    public static void getStreamData() {
        JsonSerializer<Person> personJsonSerializer = new JsonSerializer<>();
        JsonDeserializer<Person> personJsonDeserializer = new JsonDeserializer<>(
                Person.class);
        Serde<Person> personSerde = Serdes.serdeFrom(personJsonSerializer,
                personJsonDeserializer);
        KStreamBuilder builder = getStreamingConsumer();

        try {

            KStream<String, Person> kStream = builder.stream(Serdes.String(),
                    personSerde, "test");
            kStream.foreach(new ForeachAction<String, Person>() {

                @Override
                public void apply(String arg0, Person arg1) {
                    System.out.println(arg1.getCountry());                  
                }

            });
        } catch (Exception s) {
            s.printStackTrace();
        }
        KafkaStreams kafkaStreams = new KafkaStreams(builder, getProperties());
        kafkaStreams.start();
    }

}

制作人:

public class ProducerUtilities {

    public static org.apache.kafka.clients.producer.Producer<String, Person> getProducer() {
        Properties configProperties = new Properties();
        configProperties.put(ProducerConfig.CLIENT_ID_CONFIG,
                "kafka json producer");
        configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                "localhost:9092");
        configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.ByteArraySerializer");
        configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                "com.kafka.api.serdes.JsonSerializer");

        org.apache.kafka.clients.producer.Producer<String, Person> producer = new KafkaProducer<String, Person>(
                configProperties);
        return producer;
    }

    public ProducerRecord<String, Person> createRecord(Person person) {
        ProducerRecord<String, Person> record = new ProducerRecord<String, Person>(
                "test", person);
        return record;
    }

}

这篇关于使用Apache Kafka Streaming解析JSON数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆