kafka简单的入门案例 kafka应用实例( 六 )


application.yml配置如下:server: port: 8080spring: kafka:bootstrap-servers: 192.168.65.60:9092,192.168.65.60:9093,192.168.65.60:9094producer: # 生产者retries: 3 # 设置大于0的值,则客户端会将发送失败的记录重新发送batch-size: 16384buffer-memory: 33554432acks: 1# 指定消息key和消息体的编解码方式key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: default-groupenable-auto-commit: falseauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerlistener:# 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交# RECORD# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交# BATCH# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交# TIME# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交# COUNT# TIME | COUNT 有一个条件满足时提交# COUNT_TIME# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交# MANUAL# 手动调用Acknowledgment.acknowledge()后立即提交,一般使用这种# MANUAL_IMMEDIATEack-mode: manual_immediate生产者代码:package com.yundasys.usercenter.collect.api.vo.req;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.web.bind.annotation.RequestMapping;/** * @program: usercenter-portrait-collect * @description: KafkaController * @author: yxh-word * @create: 2021-07-14 * @version: v1.0.0 创建文件, yxh-word, 2021-07-14 **/public class KafkaController {private final static String TOPIC_NAME = "my-replicated-topic";@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@RequestMapping("/send")public void send() {kafkaTemplate.send(TOPIC_NAME, 0, "key", "this is a msg");}}消费者代码:package com.yundasys.usercenter.collect.api.vo.req;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.kafka.support.Acknowledgment;/** * @program: usercenter-portrait-collect * @description: MyConsumer * @author: yxh-word * @create: 2021-07-14 * @version: v1.0.0 创建文件, yxh-word, 2021-07-14 **/public class MyConsumer {/*** @KafkaListener(groupId = "testGroup", topicPartitions = {*@TopicPartition(topic = "topic1", partitions = {"0", "1"}),*@TopicPartition(topic = "topic2", partitions = "0",*partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))*},concurrency = "6")*//concurrency就是同组下的消费者个数,就是并发消费数,必须小于等于分区总数* @param record*/@KafkaListener(topics = "my-replicated-topic",groupId = "yundaGroup")public void listenZhugeGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {String value = https://www.520longzhigu.com/diannao/record.value();System.out.println(value);System.out.println(record);//手动提交offsetack.acknowledge();}/*//配置多个消费组@KafkaListener(topics ="my-replicated-topic",groupId = "likeGroup")public void listenTulingGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {String value = https://www.520longzhigu.com/diannao/record.value();System.out.println(value);System.out.println(record);ack.acknowledge();}*/}


以上关于本文的内容,仅作参考!温馨提示:如遇健康、疾病相关的问题,请您及时就医或请专业人士给予相关指导!

「四川龙网」www.sichuanlong.com小编还为您精选了以下内容,希望对您有所帮助: