通过上一篇从etcd中获取配置信息, 现在要拿着这些配置项进行日志收集
根据etcd的配置项创建多个tailtask
项目结构:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29.
│ go.mod
│ go.sum
│
│
├─conf
│ logagent.conf
│
├─kafka
│ kafka.go
│
├─logs
│ my.log
│
├─main
│ config.go
│ etcd.go
│ ip.go
│ log.go
│ main.go
│ server.go
│
├─tailf
│ tail.go
│
└─tools
└─SetConf
main.go
新增tools包, 包含SetConf
里的main.go
将设置的配置信息导入到etcd中
1 | package main |
直接运行文件
connect succ
/backend/logagent/config/192.168.0.11 : [{"path":"E:/nginx/logs/access.log","topic":"nginx_log"},{"path":"E:/nginx/logs/error.log","topic":"nginx_log_err"}]
成功将配置信息导入到etcd中, 下一步需要根据etcd中的配置项, 创建多个tailtask
我们想要实现的是
E:/nginx/logs/access.log 日志文件发送到kafka的nginx_log中
E:/nginx/logs/error.log 日志文件发送到kafka的nginx_log_err中
main包中新建etcd.go文件
用于初始化连接etcd与从etcd中取出配置信息
因为存入etcd时传入的是json格式,所以取出使用时需要反序列化json.Unmarshal(v.Value, &collectConf)
1 | package main |
修改logagent.conf配置文件
1 | [logs] |
main包中新建ip.go文件
使用net
包取出本机所有的网卡ip去连接etcd
考虑到以后添加新服务器时不需要手动添加ip,这里将ip信息全部存入localIPArray
数组中1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26package main
import (
"fmt"
"net"
)
var (
localIPArray []string
)
func init() {
addrs, err := net.InterfaceAddrs()
if err != nil {
panic(fmt.Sprintf("获取网卡ip失败, %v", err))
}
for _, addr := range addrs {
if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
if ipnet.IP.To4() != nil {
localIPArray = append(localIPArray, ipnet.IP.String())
}
}
}
fmt.Println(localIPArray)
}
config.go加入etcd配置
1 | // etcd |
1 | // 日志配置 |
tail.go修改序列化
1 | type CollectConf struct { |
main.go函数
将initEtcd
初始化etcd函数放到InitTail
函数之前, 使etcd中的配置项与tailf连接起来1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61package main
import (
"fmt"
"github.com/astaxie/beego/logs"
"logagent/kafka"
"logagent/tailf"
)
func main() {
fmt.Println("开始")
// 读取初始化配置文件
filename := "E:\\Go\\logagent\\conf\\logagent.conf"
err := loadInitConf("ini", filename)
if err != nil {
fmt.Printf("导入配置文件错误:%v\n", err)
panic("导入配置文件错误")
return
}
// 初始化日志信息
err = initLogger()
if err != nil {
fmt.Printf("导入日志文件错误:%v\n", err)
panic("导入日志文件错误")
return
}
// 输出成功信息
logs.Debug("导入日志成功%v", logConfig)
// 初识化etcd
collectConf, err := initEtcd(logConfig.etcdAddr, logConfig.etcdKey)
if err != nil {
logs.Error("初始化etcd失败",err)
}
logs.Debug("初始化etcd成功!")
// 初始化tailf
err = tailf.InitTail(collectConf, logConfig.chanSize)
if err != nil {
logs.Error("初始化tailf失败:", err)
return
}
logs.Debug("初始化tailf成功!")
// 初始化Kafka
err = kafka.InitKafka(logConfig.KafkaAddr)
if err != nil {
logs.Error("初识化Kafka producer失败:", err)
return
}
logs.Debug("初始化Kafka成功!")
// 运行
err = serverRun()
if err != nil {
logs.Error("serverRun failed:", err)
}
logs.Info("程序退出")
}
运行, 成功将etcd中的配置项经过tailf得到配置, 创建了多个tailtask1
2
3
4
5
6[169.254.109.181 169.254.30.148 192.168.106.1 192.168.0.1 192.168.0.11 169.254.153.68]
开始
{E:/nginx/logs/access.log nginx_log}
{E:/nginx/logs/error.log nginx_log_err}
2020/03/23 18:14:44 Waiting for E:/nginx/logs/access.log to appear...
2020/03/23 18:14:44 Waiting for E:/nginx/logs/error.log to appear...