一、前言
最近在数据开发中,使用到了 Fluentd,具体情况是:移动端上传到 OSS 的日志文件,每个文件大概几 k,但有上千万之多,导致 MaxCompute 在运行的时候报内存栈溢出的错误,因此我们使用了 Fluentd,主要做的是小文件合并操作(MaxCompute 自带的文件合并无法处理)。
合并策略:OSS - Fluentd - OSS,先从 OSS 获取数据(小文件),Fluentd 进行合并之后,再重新上传至 OSS,MaxCompute 把 Fluentd 合并之后的文件当作数据源,问题解决。
二、Fluentd 简介
Fluentd 是一个开源的数据收集器,它允许统一进行数据的收集和使用,以便更好地使用以及理解数据。
Fluentd 架构图(来自 Fluentd 官网):
三、Fluentd 集成
Fluentd 集成很简单,下载其商业化应用 td-agent,下载地址:https://docs.fluentd.org/installation。
接着安装 Fluentd 的 OSS 插件:
1
|
/usr/sbin/td-agent-gem install fluent-plugin-aliyun-oss
|
四、Fluentd 配置
1. 从 OSS 读数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
<source>
@type oss
tag load
endpoint oss-cn-hangzhou-internal.aliyuncs.com # endpoint 可以在阿里云查看
bucket *** # 桶名
access_key_id *** # 你的access_key_id
access_key_secret *** # 你的access_key_secret
flush_batch_lines 10000
store_as text
#pos_file /var/log/apache2/access_log.pos
<mns> # 监听的mns
endpoint 20967412.mns.cn-hangzhou-internal.aliyuncs.com # endpoint 可以在阿里云查看
queue *** # 监听的mns队列名称
wait_seconds 0 # 等待时间
poll_interval_seconds 0
</mns>
#<parse>
# @type json
#</parse>
</source>
|
从 OSS 读数据,需要配合阿里云的 MNS 以及 OSS 的事件通知来做,具体看文档配置事件通知。
2. 向 OSS 写数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
<match load>
@type oss
endpoint oss-cn-hangzhou-internal.aliyuncs.com # endpoint 可以在阿里云查看
bucket *** # 桶名
access_key_id *** # 你的access_key_id
access_key_secret *** # 你的access_key_secret
upload_crc_enable false
path *** # 上传到哪个文件夹下
auto_create_bucket true
store_as text
time_slice_format %Y%m%d
key_format "%{path}/%{time_slice}/aliyun_events_%{index}_%{thread_id}_1.%{file_extension}" # 合并后的文件名字
<buffer tag,time>
@type memory
path /septnet/fluentd_buf/oss-log-fluent.log
timekey 1800 # 多久上传一次,单位s
timekey_wait 1
flush_thread_count 5
#flush_interval 60s
#total_limit_size 2m
chunk_limit_size 100m
</buffer>
#<format>
# @type json
#</format>
</match>
|
-
2020.02.18 补充
-
2020.03.25 补充:
- 最近我们部门测试组在测试 Fluentd 日志合并时,出现了这样一个问题:在日志量特别大的情况下,Fluentd 会进行重复合并,比如原始日志有 10000 条,Fluentd 合并后会有 10200 条,其中 200 条是二次或多次重复合并导致的。为什么会出现这样的问题呢?这其实是由 Fluentd 自身的机制引起的。Fluentd 是日志事件交付系统,主要做的是日志不丢,因此它默认使用的是 At least once,即每条消息至少交付一次。具体文档可参考这里:High Availability Config。那么该怎么解决这个问题呢?我这边做的比较简单粗暴,但目前看也是唯一切实可行的,就是在数仓直接把重复日志过滤掉。
参考文档
fluentd 官方文档
fluentd社区
fluent-plugin-oss
fluentd简介
Fluentd性能优化实践