如何使用"li-apache-kafka-clients"在春季启动应用中发送来自Kafka制作人的大消息(大于1MB)? [英] How to use "li-apache-kafka-clients" in spring boot app to send large message (above 1MB) from Kafka producer?
问题描述
如何在Spring Boot应用程序中使用li-apache-kafka-clients从Kafka生产者向Kafka Consumer发送大消息(大于1MB)?以下是li-apache-kafka-clients的GitHub链接: https://github.com/linkedin/li-apache-kafka-clients
How to use li-apache-kafka-clients in spring boot app to send large message (above 1MB) from Kafka producer to Kafka Consumer? Below is the GitHub link of li-apache-kafka-clients: https://github.com/linkedin/li-apache-kafka-clients
我已经导入了li-apache-kafka-clients的.jar文件,并为生产者添加了以下配置:
I have imported .jar file of li-apache-kafka-clients and put the below configuration for producer:
props.put("large.message.enabled", "true");
props.put("max.message.segment.bytes", 1000 * 1024);
props.put("segment.serializer", DefaultSegmentSerializer.class.getName());
并针对消费者:
message.assembler.buffer.capacity,
max.tracked.messages.per.partition,
exception.on.message.dropped,
segment.deserializer.class
但是对于大消息仍然出现错误.请帮助我解决错误.
but still getting error for large message. Please help me to solve error.
下面是我的代码,请告诉我创建LiKafkaProducer的位置:
Below is my code, please let me know where I need to create LiKafkaProducer:
@Configuration
public class KafkaProducerConfig {
@Value("${kafka.boot.server}")
private String kafkaServer;
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfig());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<String, String>(producerFactory());
}
@Bean
public Map<String, Object> producerConfig() {
// TODO Auto-generated method stub
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put("bootstrap.servers", "localhost:9092");
config.put("acks", "all");
config.put("retries", 0);
config.put("batch.size", 16384);
config.put("linger.ms", 1);
config.put("buffer.memory", 33554432);
// The following properties are used by LiKafkaProducerImpl
config.put("large.message.enabled", "true");
config.put("max.message.segment.bytes", 1000 * 1024);
config.put("segment.serializer", DefaultSegmentSerializer.class.getName());
config.put("auditor.class", LoggingAuditor.class.getName());
return config;
}
}
@RestController
@RequestMapping("/kafkaProducer")
public class KafkaProducerController {
@Autowired
private KafkaSender sender;
@PostMapping
public ResponseEntity<List<Student>> sendData(@RequestBody List<Student> student){
sender.sendData(student);
return new ResponseEntity<List<Student>>(student, HttpStatus.OK);
}
}
@Service
public class KafkaSender {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSender.class);
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Value("${kafka.topic.name}")
private String topicName;
public void sendData(List<Student> student) {
// TODO Auto-generated method stub
Map<String, Object> headers = new HashMap<>();
headers.put(KafkaHeaders.TOPIC, topicName);
headers.put("payload", student.get(0));
// Construct a JSONObject from a Map.
JSONObject HeaderObject = new JSONObject(headers);
System.out.println("\nMethod-2: Using new JSONObject() ==> " + HeaderObject);
final String record = HeaderObject.toString();
Message<String> message = MessageBuilder.withPayload(record).setHeader(KafkaHeaders.TOPIC, topicName)
.setHeader(KafkaHeaders.MESSAGE_KEY, "Message")
.build();
kafkaTemplate.send(topicName, message.toString());
}
}
推荐答案
您需要实现自己的ConsumerFactory
和ProducerFactory
才能分别创建LiKafkaConsumer
和LiKafkaProducer
.
You would need to implement your own ConsumerFactory
and ProducerFactory
to create the LiKafkaConsumer
and LiKafkaProducer
respectively.
您应该能够继承框架提供的默认工厂.
You should be able to subclass the default factories provided by the framework.
这篇关于如何使用"li-apache-kafka-clients"在春季启动应用中发送来自Kafka制作人的大消息(大于1MB)?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!