如何使用"li-apache-kafka-clients"在春季启动应用中发送来自Kafka制作人的大消息(大于1MB)? [英] How to use "li-apache-kafka-clients" in spring boot app to send large message (above 1MB) from Kafka producer?

查看:68
本文介绍了如何使用"li-apache-kafka-clients"在春季启动应用中发送来自Kafka制作人的大消息(大于1MB)?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如何在Spring Boot应用程序中使用li-apache-kafka-clients从Kafka生产者向Kafka Consumer发送大消息(大于1MB)?以下是li-apache-kafka-clients的GitHub链接: https://github.com/linkedin/li-apache-kafka-clients

How to use li-apache-kafka-clients in spring boot app to send large message (above 1MB) from Kafka producer to Kafka Consumer? Below is the GitHub link of li-apache-kafka-clients: https://github.com/linkedin/li-apache-kafka-clients

我已经导入了li-apache-kafka-clients的.jar文件,并为生产者添加了以下配置:

I have imported .jar file of li-apache-kafka-clients and put the below configuration for producer:

props.put("large.message.enabled", "true");
props.put("max.message.segment.bytes", 1000 * 1024);
props.put("segment.serializer", DefaultSegmentSerializer.class.getName());

并针对消费者:

message.assembler.buffer.capacity, 
max.tracked.messages.per.partition, 
exception.on.message.dropped, 
segment.deserializer.class

但是对于大消息仍然出现错误.请帮助我解决错误.

but still getting error for large message. Please help me to solve error.

下面是我的代码,请告诉我创建LiKafkaProducer的位置:

Below is my code, please let me know where I need to create LiKafkaProducer:

@Configuration
public class KafkaProducerConfig {

    @Value("${kafka.boot.server}")
    private String kafkaServer;


    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfig());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<String, String>(producerFactory());
    }

    @Bean
    public Map<String, Object> producerConfig() {
        // TODO Auto-generated method stub
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put("bootstrap.servers", "localhost:9092");
        config.put("acks", "all");
        config.put("retries", 0);
        config.put("batch.size", 16384);
        config.put("linger.ms", 1);
        config.put("buffer.memory", 33554432);

        // The following properties are used by LiKafkaProducerImpl
        config.put("large.message.enabled", "true");
        config.put("max.message.segment.bytes", 1000 * 1024);
        config.put("segment.serializer", DefaultSegmentSerializer.class.getName());
        config.put("auditor.class", LoggingAuditor.class.getName());

        return config;
    }
}

@RestController
@RequestMapping("/kafkaProducer")
public class KafkaProducerController {

    @Autowired
    private KafkaSender sender;

    @PostMapping
    public ResponseEntity<List<Student>> sendData(@RequestBody List<Student> student){

        sender.sendData(student);
        return new ResponseEntity<List<Student>>(student, HttpStatus.OK);
    }
}

@Service
public class KafkaSender {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSender.class);

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Value("${kafka.topic.name}")
    private String topicName;

    public void sendData(List<Student> student) {

        // TODO Auto-generated method stub
        Map<String, Object> headers = new HashMap<>();
        headers.put(KafkaHeaders.TOPIC, topicName);
        headers.put("payload", student.get(0));

        // Construct a JSONObject from a Map.
        JSONObject HeaderObject = new JSONObject(headers);
        System.out.println("\nMethod-2: Using new JSONObject() ==> " + HeaderObject);
        final String record = HeaderObject.toString();

        Message<String> message = MessageBuilder.withPayload(record).setHeader(KafkaHeaders.TOPIC, topicName)
                .setHeader(KafkaHeaders.MESSAGE_KEY, "Message")
                .build();

        kafkaTemplate.send(topicName, message.toString());
    }
}

推荐答案

您需要实现自己的ConsumerFactoryProducerFactory才能分别创建LiKafkaConsumerLiKafkaProducer.

You would need to implement your own ConsumerFactory and ProducerFactory to create the LiKafkaConsumer and LiKafkaProducer respectively.

您应该能够继承框架提供的默认工厂.

You should be able to subclass the default factories provided by the framework.

这篇关于如何使用"li-apache-kafka-clients"在春季启动应用中发送来自Kafka制作人的大消息(大于1MB)?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆