当前位置: 首页 > news >正文

Flume日志采集简介

一、flume分布

flume的核心组件分为数据源source、管道channel和目的地sink。

  • source:对接数据源
  • channel:用于中间缓存数据
  • sink:对接目的地

在整个数据传输的过程中,流动的是 event,它是 Flume 内部数据传输的最基本单元。

event 将传输的数据进行封装。如果是文本文件,通常是一行记录,event 也是事务的基本单位。event 从 source,流向 channel,再到 sink,本身为一个字节数组,并可携带 headers(头信息)信息。event 代表着一个数据的最小完整单元,从外部数据源来,向外部的目的地去。

一个完整的 event 包括:event headers、event body、event 信息,其中event 信息就是 flume 收集到的日记记录。

二、flume的结构

flume的内部结构分为标准和串联

  • 标准结构:单台flume传输,单管道,单目的地。
  • 串联:多台flume传输给一台flume的数据源,然后这台进行汇总传输到最终目的地(hdfs)。

flume的原理:

  • ​ flume本身是一个Java进程,启动以后会生成一个叫agent的进程。
  • ​ agent进程中包含了source,sink和channel。
  • ​ 在flume中,数据被包装为event。真实的数据存放在event body中
    • event是flume中最小的数据单元

三、flume的安装与部署

flume官网:(https://flume.apache.org/)

Flume 1.11.0 用户指南 — Apache Flume

flume部署步骤:

  • 解压
  • 配置环境变量
  • 改名flume-env.sh、在flume-env.sh输入Java路径
  • 删除lib下guava可能导致的版本冲突问题
  • 编辑conf文件
  • 启动flume

flume监视端口案例(官网案例)

# example.conf: A single-node Flume configuration# 定义agent组件的名称a1.sources=r1 a1.sinks=k1 a1.channels=c1# 定义sources组件属性:r1# 设定type为netcat,用处为监听端口的数据,转换log为英文文本a1.sources.r1.type=netcat a1.sources.r1.bind=localhost a1.sources.r1.port=44444# 定义sinks组件属性:k1# 定义type打印方式:这里为logger,即屏幕上(还可为hdfs等类型)a1.sinks.k1.type=logger# 定义channels组件属性:c1a1.channels.c1.type=memory# 设定最多存放1000个、设定事务容量为100个(一起成功或失败)a1.channels.c1.capacity=1000a1.channels.c1.transactionCapacity=100# 设置sources和sinks的管道(即channels)a1.sources.r1.channels=c1 a1.sinks.k1.channel=c1

flume启动案例

flume-ng agent-nagent-c$FLUME_HOME/conf-f$FLUME_HOME/conf/flume-properties.conf-Dflume.root.logger=INFO,console

flume监视文件目录案例

a1.sources=r1a1.sinks=k1a1.channels=c1
  • sources
# 设定sources属性:r1# 设定sources的type为spooldir可以监视文件目录# 注意不能有同名的文件在hdfs下产生,否则报错罢工,所以一般使用时间戳来命名进行采集数据a1.sources.r1.type=spooldir a1.sources.r1.spoolDir=/root/dirlogs a1.sources.r1.fileHeader=true
  • sinks
# 定义sinks组件属性:k1# 定义type打印方式:这里为hdfsa1.sinks.k1.type=hdfs# 设置动态获取当前时间a1.sinks.k1.hdfs.useLocalTimeStamp=truea1.sinks.k1.hdfs.path=/tmp/flume/%y-%m-%d/%H:%M/%S# filePrefix设定文件前缀a1.sinks.k1.hdfs.filePrefix=-events# fileSuffix设置文件后缀a1.sinks.k1.hdfs.fileSuffix=.log# 设置文件类型为datastream普通文件,默认为序列化文件a1.sinks.k1.hdfs.fileType=Datastream# round 控制文件夹以多少时间滚动、是否开启时间舍弃# 这里设置为10分钟,则10分钟内采集的文件都会在一个文件夹下面a1.sinks.k1.hdfs.round=truea1.sinks.k1.hdfs.roundValue=10a1.sinks.k1.hdfs.roundUnit=minute# roll 控制写入hdfs文件,以何种方式滚动a1.sinks.k1.hdfs.rollInterval=3# 以时间间隔、默认30秒a1.sinks.k1.hdfs.rollSize=20# 以文件大小、默认1024a1.sinks.k1.hdfs.rollCount=5# 以event数量、默认10个# 如果三个都设置,谁先满足谁触发# 如果不想以某种属性为滚动,设置为0即可
  • channels
# 定义channels组件属性:c1# 设定使用内存来缓存a1.channels.c1.type=memory# 设定最多存放1000个、设定事务容量为100个event(一起成功或失败)a1.channels.c1.capacity=1000a1.channels.c1.transactionCapacity=100
  • 设定连接
# 设置sources和sinks的管道(即channels)a1.sources.r1.channels=c1 a1.sinks.k1.channel=c1

flume监视文件日志(实时监视、实时更新)

a1.sources=r1a1.sinks=k1a1.channels=c1
  • sources
# 设定sources属性:r1# 设置exec可以运行cat或者tail -F命令的结果作为数据进行收集a1.sources.r1.type=execa1.sources.r1.command=cat/root/logs/tests.log
  • sinks
# 定义sinks组件属性:k1a1.sinks.k1.type=hdfs a1.sinks.k1.hdfs.useLocalTimeStamp=truea1.sinks.k1.hdfs.path=/tmp/flume/%y-%m-%d/%H:%M/%S a1.sinks.k1.hdfs.filePrefix=-eventsa1.sinks.k1.hdfs.fileSuffix=.log a1.sinks.k1.hdfs.fileType=Datastream a1.sinks.k1.hdfs.round=truea1.sinks.k1.hdfs.roundValue=10a1.sinks.k1.hdfs.roundUnit=minute a1.sinks.k1.hdfs.rollInterval=3a1.sinks.k1.hdfs.rollSize=20a1.sinks.k1.hdfs.rollCount=5
  • channels
# 定义channels组件属性:c1a1.channels.c1.type=memory a1.channels.c1.capacity=1000a1.channels.c1.transactionCapacity=100
  • 设定连接
# 设置sources和sinks的管道(即channels)a1.sources.r1.channels=c1 a1.sinks.k1.channel=c1

spooldir监视文件

a1.sources=s1a1.channels=c1a1.sinks=s2a1.sources.s1.type=spooldira1.sources.s1.spoolDir=/root/csv/a1.sources.s1.fileHeader=truea1.channels.c1.type=memorya1.channels.c1.capacity=100000000a1.channels.c1.transactionCapacity=50000000a1.sinks.s2.type=hdfsa1.sinks.s2.hdfs.path=hdfs://master:9000/inputa1.sinks.s2.hdfs.rollSize=100000000a1.sinks.s2.hdfs.rollCount=0a1.sinks.s2.hdfs.rollInterval=0a1.sinks.s2.hdfs.batchSize=50000000a1.sources.s1.channels=c1a1.sinks.s2.channel=c1

四、flume的load-balance、failover

flume的负载均衡

flume在串联的过程中,如果前面的agent能够一次处理100个event、后面一个则只能一次处理10个event、那么就会出现堆积情况。

这时候可以多个进程进行处理前面的agent。

同一个请求只能交给一个进行处理,避免数据重复。

如何分配请求就涉及到了负载均衡的算法:轮询(round_robin) 随机(random) 权重

负载均衡是用于解决一台机器(一个进程)无法解决所有请求而产生的一种算法。Load balancing Sink Processor能够实现load balance功能,如下图Agent1是一个路由节点,负责将Channel暂存的Event均衡到对应的多个Sink组件上,而每个Sink组件分别连接到一个独立的Agent 上,示例配置,如下所示:

a1.sinkgroups=g1 a1.sinkgroups.g1.sinks=k1 k2 k3 a1.sinkgroups.g1.processortype=load_balance a1.sinkgroups.g1.processor.backoff=true#如果开启,则将失败的sink放入黑名单a1.sinkgroups.g1.processor.selector=round_robin#另外还支持randoma1.sinkgroups.g1.processor.selector.maxTimeOut=10000#在黑名单放置的超时时间,超时结束时,若仍然无法接收,则超时时间呈指数增长

flume串联跨网络传输数据

  • avro sink

  • avro source

    使用上述两个组件指定绑定的端口ip 就可以满足数据跨网络传递 通常用于flume串联架构中。

    agent1.sinks.k1.channel=c1 agent1.sinks.k1.type=avro agent1.sinks.k1.hostname=node-2 agent1.sinks.k1.port=52020# set sink2agent1.sinks.k2.channel=c1 agent1.sinks.k2.type=avro agent1.sinks.k2.hostname=node-3 agent1.sinks.k2.port=52020#set sink groupagent1.sinkgroups.gl.sinks=k1 k2#set failoveragent1.sinkgroups.gl.processor.type=load_balance agent1.sinkgroups.gl.processor.backoff=trueagent1.sinkgroups.gl.processor.selector=round robin|agent1.sinkgroups.gl.processor.selector.maxTimeOut=10000
    al.sources=rl al.sinks=kl al.channels=cl# Describe/configure the sourceal.sources.rl.type=avro al.sources.rl.channels=cl al.sources.rl.bind=node-3 al.sources.rl.port=52020# Describe the sinkal.sinks.kl.type=logger# Use a channel which buffers events in memoryal.channels.cl.type=memory al.channels.cl.capacity=1000al.channels.cl.transactionCapacity=100# Bind the source and sink to the channelal.sources.rl.channels=cl al.sinks.kl.channel=cl
http://www.cnnetsun.cn/news/3099802.html

相关文章:

  • 哔咔漫画下载器:5分钟打造个人离线漫画图书馆的终极指南
  • 揭秘IntelliJ IDEA内联变量真相:90%开发者忽略的性能陷阱与避坑指南
  • Ai驱动结合蛋白设计:Bindcraft全流程教学
  • 重构前必看!IDEA 2023.3+接口抽取的3大隐性风险与2个强制校验步骤,错过=技术债翻倍
  • HTTP/2快速重置攻击漏洞修复实战:从原理到Nginx、F5 BIG-IP修复方案
  • DownKyi:B站视频批量下载的终极解决方案
  • Win11Debloat终极指南:一键清理Windows系统垃圾,性能提升51%的完整教程
  • 为什么资深架构师严禁盲目内联变量?——基于200+企业级项目重构审计数据的反模式警示
  • CAD图纸版本管理噩梦:设计院用32维权限3天解决
  • 如何快速解锁加密音乐:免费音频解密工具完整指南
  • RAG与微调在领域专业化中的协同路径与实操决策
  • 虚幻引擎脚本系统完整指南:从零开始掌握UE4SS的强大功能
  • 实现状态栏透明
  • 三步实现百度文库文档免费获取:技术原理与实践指南
  • 第一次去医院资料别临时翻
  • 手把手教你怎么安装Bruker DataAnalysis 4.4 质谱数据处理软件下载安装教程
  • 格式转换一键搞定!视频、音频、图片、文档轻松互转!
  • 向日葵CLI如何赋能批量设备远程运维管理?附AI自动化管理实战
  • 5分钟快速上手PPTist:免费网页版PPT制作工具的终极指南
  • 私域直播SaaS横向测评:保利威、诺云、悦邻,谁更懂“社区门店”的生意逻辑?
  • 计步器算法原理及数据分析
  • PPTist免费网页版PPT制作工具:告别Office束缚,打造专业演示文稿的终极指南
  • 瑞芯微RV1126B开发板(EASY-EAI-PI2) 人员检测
  • 电子电路与PCBA:从概念到可制造组装
  • edis 单线程真的是单线程吗?源码角度全面解析
  • 【EI会议征稿进行中】第六届电子通信与计算机科学技术国际学术会议(ECCST 2026)
  • 光模块耦合,到底 “耦合” 了什么?
  • ESP芯片烧录终极指南:从零开始掌握esptool.py完整操作流程
  • 如何快速掌握Audacity:免费音频编辑的完整指南
  • OpenMP并行编程优化与性能调优实践