当前位置: 首页 > news >正文

Kafka 和springboot 整合Logback日志

Spring Boot 整合 Kafka 进行日志处理是一个常见的任务,可以帮助你更好地管理和分析应用程序的日志。以下是一个基本的步骤指南,帮助你完成这个整合。

本文介绍了如何在SpringBoot应用中整合Kafka,利用Logback收集日志并发送到Kafka。详细步骤包括添加Kafka相关依赖,配置logback.xml,自定义KafkaAppender以及提供测试。

首先安装kafka.。之前已经安装过了,我们这里不再讲。,可以参考前面讲的类容:

https://blog.csdn.net/lchmyhua88/article/details/155640953?spm=1001.2014.3001.5502

接着我们按照下面的步骤开始:

  • 1.pom依赖
  • 2.logback.xml配置
  • 3.配置 kafka消费者
  • 4.监听消费消息&测试代码

1. 添加依赖

<!-- Spring Boot Starter Kafka --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <!-- Logback Kafka Appender --> <dependency> <groupId>com.github.danielwegener</groupId> <artifactId>logback-kafka-appender</artifactId> <version>0.2.0-RC2</version> </dependency> <!-- Kafka Clients --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.0.0</version> </dependency>

2.配置 Logback

<configuration> <!-- 控制台输出appender --> <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> <encoder> <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern> </encoder> </appender> <!-- Kafka appender --> <appender name="KAFKA" class="com.github.danielwegener.logback.kafka.KafkaAppender"> <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"> <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern> </encoder> <!-- Kafka topic名称 --> <topic>log-topic</topic> <!-- Kafka bootstrap servers --> <keyingStrategy class="com.github.danielwegener.logback.kafka.keying.NoKeyKeyingStrategy" /> <deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy" /> <!-- Kafka producer配置 --> <producerConfig>bootstrap.servers=192.168.33.10:9092</producerConfig> <producerConfig>acks=0</producerConfig> <producerConfig>linger.ms=1000</producerConfig> <producerConfig>max.block.ms=0</producerConfig> <!-- 过滤器设置(可选) --> <filter class="ch.qos.logback.classic.filter.ThresholdFilter"> <level>INFO</level> </filter> </appender> <!-- 根logger配置 --> <root level="INFO"> <appender-ref ref="CONSOLE"/> <appender-ref ref="KAFKA"/> </root> </configuration>

3.配置 kafka消费者,为了方便测试我们把生产者消费者都配置上

spring.kafka.bootstrap-servers=192.168.33.10:9092 #spring.kafka.consumer.group-id=my-group spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.enable-auto-commit=true spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer spring.kafka.consumer.properties.spring.json.trusted.packages=* spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonDeserializer

4.注入kafka bean ,方便后面调试生产者。

@Configuration public class KafkaProducerConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Bean public ProducerFactory<String, Object> producerFactory() { Map<String, Object> configProps = new HashMap<>(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return new DefaultKafkaProducerFactory<>(configProps); } @Bean public KafkaTemplate<String, Object> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } }

5.监听消费消息

@Component public class LogConsumer { @KafkaListener(topics = "log-topic",groupId = "my-group") public void listen(String message) { System.out.println("收到日志消息: " + message); } }

6.添加日志打印

@RestController @RequestMapping("/kafka") public class LogController { @Autowired private KafkaProducerServiceImpl producerService; private static final Logger logger = (Logger) LoggerFactory.getLogger(LogController.class); @GetMapping("/log") public String log() { //producerService.sendLogMessage("This is a test log message"); logger.info("这是一条测试日志消息"); logger.warn("这是一条警告日志消息"); logger.error("这是一条错误日志消息"); return "Log message sent"; } }

下面我们开始测试,通过调用接口:

控制台可以看到我写3条日志,kafka消费3条消息

这样我们就通过logback把日志收集到kafka里面。方便数据分析。当然你也可以写入es进行分析画像。

我们可以通过kafka manger控制台来监控消息。

下载kafka-manager,下载地址: https://github.com/yahoo/kafka-manager

解压后配置application.conf kafka地址:

保存后,指定8888端口启动:

bin/kafka-manager -Dconfig.file=conf/application.conf -Dhttp.port=8888

启动后打开控制面板,可以看到kafka集群节点信息和分区,消费偏移等信息:

http://192.168.33.10:8888/

详情可以去官网看看介绍:

https://github.com/yahoo/CMAK

http://www.cnnetsun.cn/news/51283.html

相关文章:

  • K8S-EFK日志收集实战指南
  • 外贸流程管理系统
  • 200万token上下文能力,并且越用越聪明!Google Research重构AI长期记忆
  • Flutter + OpenHarmony 国际化与无障碍(i18n a11y)深度实践:打造真正包容的鸿蒙应用
  • 风光储并网直流微电网Simulink仿真模型:光伏、风力与混合储能系统的集成
  • Python第三次作业
  • 44、深入探索GDB调试技巧与C/C++代码调试
  • 复盘 Git+GitHub SSH 配置:从权限报错到免密推送的全流程解决方案
  • Screenbox媒体播放器隐藏功能终极指南:从入门到精通
  • FlashAttention终极指南:突破大模型训练内存瓶颈的完整教程
  • 冒泡排序 ~ 背下来的 哭
  • 手把手教你学Simulink——机器人轨迹跟踪场景实例:基于Simulink的永磁同步电机关节空间直线轨迹跟踪控制仿真
  • 盈富宝典 通达信主图
  • 14、Python在不同场景下的应用与实践
  • X-AnyLabeling 自动数据标注保姆级教程:从安装到格式转换全流程
  • 38、深入探索bc计算器、数组及特殊编程技巧
  • vue基于Spring Boot框架的技术实现的医院住院管理系统_229p8ejv
  • 基于vue的停车场预约管理系统地图_n7nz82g6_springboot php python nodejs
  • 基于vue的宠物领养系统的设计与实现_389i5918_springboot php python nodejs
  • 基于vue的生鲜团购管理系统设计与实现优惠卷_2av6282k_springboot php python nodejs
  • React Native桌面应用交互终极指南:从点击事件到原生菜单完整教程
  • Springboot美食分享网站a73c9(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。
  • Springboot门店运营管理系统hd158(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。
  • Stellarium望远镜控制实战指南:从硬件连接到精准观测
  • 快速验证:基于CentOS 7.6的测试环境搭建
  • AI定价实战指南:快速构建电商智能定价系统
  • VGGT三维重建终极指南:从零开始构建你的3D世界
  • 电商网站秒开秘籍:快马AI加载优化案例
  • 15分钟快速验证:谷歌服务离线包生成器原型开发
  • 1小时搞定ElementUI原型:快马平台实战