清华主页 - 清华新闻 - 综合时讯 - 正文

JAVA(SpringBoot)集成Kafka发送和接收消息

SpringBoot集成Kafka发送和接收消息。

  • 一、Kafka 简介。
  • 二、Kafka 功能。
  • 三、POM依赖。
  • 四、配置文件。
  • 五、生产者。
  • 六、消费者。

君子之学贵一,一则明,明则有功。

一、Kafka 简介。

Kafka 是由 Apache 软件基金会开发的开源流处理平台,最初由 LinkedIn 公司开发,并于 2011 年开源。它是一种高吞吐量的分布式发布 - xff0订阅消息系统c;它具有可持续性、高吞吐、低延迟、高容错等特点。
Kafka 主要生产者(Producer)、#xff08消费者;Consumer)、主题(Topic)、分区(Partition)和代理(Broker)等组件组成。生产者负责向生产者发送数据 Kafka 集群,消费者从集群中读取数据。主题是逻辑分类,将数据发送到特定的主题。每个主题可以分为多个分区,并行处理数据,提高系统的可扩展性。代理则是 Kafka 集群中的服务器节点,负责接收和存储生产者发送的数据,并为消费者提供数据读取服务。

二、Kafka 功能。

信息队列功能:Kafka 可作为消息队列使用,在应用程序之间传递信息。生产者将消息发送到主题,不同的消费者可以从主题中订阅和消费新闻,实现应用程序解耦。例如,在电子商务系统中,订单生成模块可以将订单消息发送到 Kafka 主题,后续的库存管理、物流配送等模块可以从主题消费订单信息,独立处理,降低模块之间的耦合度。
xff1的数据存储功能a;Kafka 具有持久存储能力,它在磁盘上存储信息数据,并通过多副本机制保证数据的可靠性。即使某个节点出现故障󿀌数据不会丢失。这一特性使得 Kafka 不仅可以作为消息队列,也可用于数据的长期存储和备份,例如,存储系统的操作日志,便于后续数据分析和故障排查。
流处理功能:Kafka 可与流处理框架(如 Apache Flink、Spark Streaming 等)集成,处理实时数据流。通过将实时数据发送到 Kafka 主题,从主题中读取数据,实时计算、分析和转换流程处理框架。例如,࿰在实时监控系统中c;通过 Kafka 收集服务器性能指标数据,然后用流处理框实时分析这些数据,及时发现性能异常并发出报警。

三、POM依赖。

            org.springframework.kafka        spring-kafka        2.8.11    

四、配置文件。

spring。:。#。 Kafka。配置  kafka。:。#。 Kafka。服务器地址及端口 代理地址�可以多个    bootstrap。-。servers。:。IP。:。9092。# 生产者配置    producer。:。# 发送失败时的重试次数      retries。:。3。# 每次批量发送的消息数量,小值调整      batch。-。size。:。1。# 生产者缓冲区的大小      buffer。-。memory。:。33554432。# 消息 key 序列化器,将 key 序列化为字节数组      key。-。serializer。:。org。.。apache。.。kafka。.。common。.。serialization。.。StringSerializer。# 消息 value 序列化器,将消息序列化为字节数组      value。-。serializer。:。org。.。apache。.。kafka。.。common。.。serialization。.。StringSerializer。# 消费者配置    consumer。:。# 当没有初始偏移量或当前偏移量不存在时,,消费从最早的消息开始      auto。-。offset。-。reset。:。earliest      # 偏移量是否自动提交      enable。-。auto。-。commit。:。true。# 自动提交偏差的时间间隔(毫秒),延长自动提交时间间隔      auto。-。commit。-。interval。:。1000。# 消息 key 反序列化器,将字节数组反序列化为 key      key。-。deserializer。:。org。.。apache。.。kafka。.。common。.。serialization。.。StringDeserializer。# 消息 value 反序列化器,将字节数组反序列化为消息体      value。-。deserializer。:。org。.。apache。.。kafka。.。common。.。serialization。.。StringDeserializer。

五、生产者。

import。lombok。.。extern。.。slf4j。.。Slf4j。;import。org。.。springframework。.。beans。.。factory。.。annotation。.。Autowired。;import。org。.。springframework。.。kafka。.。core。.。KafkaTemplate。;import。org。.。springframework。.。kafka。.。support。.。SendResult。;import。org。.。springframework。.。stereotype。.。Component。;import。org。.。springframework。.。util。.。concurrent。.。ListenableFuture。;import。org。.。springframework。.。util。.。concurrent。.。ListenableFutureCallback。;/** * 生产者 * * @author chenlei */。@Slf4j。@Component。public。class。KafkaProducer。{ 。// 打印收到的信息的详细信息。log。.。info。(。"接收到 Kafka 消息: 主题 = { }, 分区 = { }, 偏移量 = { }, 键 = { }, 值 = { }",record。.。topic。(。)。,record。.。partition。(。)。,record。.。offset。(。)。,record。.。key。(。)。,record。.。value。(。)。)。;}。}。

2025-06-24 12:17:54

相关新闻

清华大学新闻中心版权所有,清华大学新闻网编辑部维护,电子信箱: news@tsinghua.edu.cn
Copyright 2001-2020 news.tsinghua.edu.cn. All rights reserved.