SpringBoot集成Kafka发送和接收消息。
- 一、Kafka 简介。
- 二、Kafka 功能。
- 三、POM依赖。
- 四、配置文件。
- 五、生产者。
- 六、消费者。
君子之学贵一,一则明,明则有功。。一、Kafka 简介。
Kafka 是由 Apache 软件基金会开发的开源流处理平台,最初由 LinkedIn 公司开发,并于 2011 年开源。它是一种高吞吐量的分布式发布 - xff0订阅消息系统c;它具有可持续性、高吞吐、低延迟、高容错等特点。
Kafka 主要生产者(Producer)、#xff08消费者;Consumer)、主题(Topic)、分区(Partition)和代理(Broker)等组件组成。生产者负责向生产者发送数据 Kafka 集群,消费者从集群中读取数据。主题是逻辑分类,将数据发送到特定的主题。每个主题可以分为多个分区,并行处理数据,提高系统的可扩展性。代理则是 Kafka 集群中的服务器节点,负责接收和存储生产者发送的数据,并为消费者提供数据读取服务。
二、Kafka 功能。
信息队列功能:Kafka 可作为消息队列使用,在应用程序之间传递信息。生产者将消息发送到主题,不同的消费者可以从主题中订阅和消费新闻,实现应用程序解耦。例如,在电子商务系统中,订单生成模块可以将订单消息发送到 Kafka 主题,后续的库存管理、物流配送等模块可以从主题消费订单信息,独立处理,降低模块之间的耦合度。
xff1的数据存储功能a;Kafka 具有持久存储能力,它在磁盘上存储信息数据,并通过多副本机制保证数据的可靠性。即使某个节点出现故障数据不会丢失。这一特性使得 Kafka 不仅可以作为消息队列,也可用于数据的长期存储和备份,例如,存储系统的操作日志,便于后续数据分析和故障排查。
流处理功能:Kafka 可与流处理框架(如 Apache Flink、Spark Streaming 等)集成,处理实时数据流。通过将实时数据发送到 Kafka 主题,从主题中读取数据,实时计算、分析和转换流程处理框架。例如,在实时监控系统中c;通过 Kafka 收集服务器性能指标数据,然后用流处理框实时分析这些数据,及时发现性能异常并发出报警。
三、POM依赖。
org.springframework.kafka spring-kafka 2.8.11
四、配置文件。
spring。:。#。 Kafka。配置 kafka。:。#。 Kafka。服务器地址及端口 代理地址可以多个 bootstrap。-。servers。:。IP。:。9092。# 生产者配置 producer。:。# 发送失败时的重试次数 retries。:。3。# 每次批量发送的消息数量,小值调整 batch。-。size。:。1。# 生产者缓冲区的大小 buffer。-。memory。:。33554432。# 消息 key 序列化器,将 key 序列化为字节数组 key。-。serializer。:。org。.。apache。.。kafka。.。common。.。serialization。.。StringSerializer。# 消息 value 序列化器,将消息序列化为字节数组 value。-。serializer。:。org。.。apache。.。kafka。.。common。.。serialization。.。StringSerializer。# 消费者配置 consumer。:。# 当没有初始偏移量或当前偏移量不存在时,,消费从最早的消息开始 auto。-。offset。-。reset。:。earliest # 偏移量是否自动提交 enable。-。auto。-。commit。:。true。# 自动提交偏差的时间间隔(毫秒),延长自动提交时间间隔 auto。-。commit。-。interval。:。1000。# 消息 key 反序列化器,将字节数组反序列化为 key key。-。deserializer。:。org。.。apache。.。kafka。.。common。.。serialization。.。StringDeserializer。# 消息 value 反序列化器,将字节数组反序列化为消息体 value。-。deserializer。:。org。.。apache。.。kafka。.。common。.。serialization。.。StringDeserializer。
五、生产者。
import。lombok。.。extern。.。slf4j。.。Slf4j。;import。org。.。springframework。.。beans。.。factory。.。annotation。.。Autowired。;import。org。.。springframework。.。kafka。.。core。.。KafkaTemplate。;import。org。.。springframework。.。kafka。.。support。.。SendResult。;import。org。.。springframework。.。stereotype。.。Component。;import。org。.。springframework。.。util。.。concurrent。.。ListenableFuture。;import。org。.。springframework。.。util。.。concurrent。.。ListenableFutureCallback。;/** * 生产者 * * @author chenlei */。@Slf4j。@Component。public。class。KafkaProducer。{ 。// 打印收到的信息的详细信息。log。.。info。(。"接收到 Kafka 消息: 主题 = { }, 分区 = { }, 偏移量 = { }, 键 = { }, 值 = { }",record。.。topic。(。)。,record。.。partition。(。)。,record。.。offset。(。)。,record。.。key。(。)。,record。.。value。(。)。)。;}。}。