Just Do IT !

Golang实战之海量日志收集系统(六)监视etcd配置项的变更

字数统计: 2.4k阅读时长: 12 min
2020/03/29 Share

在上一篇中我们已经实现了从etcd中获取配置信息并创建tailTask任务

现在我们来通过etcdwatch实现新配置的变更

监视etcd配置项的变更

实现watch各个不同ip

在真实生产环境中时会常常添加新的服务器, 这时我们需要借助之前的ip.go获取所有ip节点, 并且实时监控

修改EtcdClient结构体增加keys

1
2
3
4
type EtcdClient struct {
client *clientv3.Client
keys []string
}

在main/etcd.go中添加initEtcdWatcherwatchKey函数并且在函数initEtcd中调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 初始化多个watch监控etcd中配置节点
func initEtcdWatcher(addr string) {
for _, key := range etcdClient.keys {
go watchKey(addr,key)
}
}

func watchKey(addr string, key string) {

// 初始化连接etcd
cli, err := clientv3.New(clientv3.Config{
//Endpoints: []string{"localhost:2379", "localhost:22379", "localhost:32379"},
Endpoints: []string{addr},
DialTimeout: 5 * time.Second,
})
if err != nil {
logs.Error("连接etcd失败:", err)
return
}

logs.Debug("开始监控key:",key)

// Watch操作
wch := cli.Watch(context.Background(), key)
for resp := range wch {
for _, ev := range resp.Events {
fmt.Printf("Type: %v, Key:%v, Value:%v\n", ev.Type, string(ev.Kv.Key), string(ev.Kv.Value))
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
2020/03/24 15:21:27.632 [D]  导入日志成功&{debug E:\\Go\\logagent\\logs\\my.log 100 0.0.0.0:9092 [{E:\\Go\\logagent\\logs\\my.log nginx_log}] 0.0.0.0:2379 /backend/logagent/config/}
2020/03/24 15:21:27.646 [D] resp from etcd:[]
2020/03/24 15:21:27.646 [D] resp from etcd:[]
2020/03/24 15:21:27.647 [D] resp from etcd:[]
2020/03/24 15:21:27.647 [D] resp from etcd:[]
2020/03/24 15:21:27.647 [D] resp from etcd:[key:"/backend/logagent/config/192.168.0.11" create_revision:6 mod_revision:9 version:4 value:"[{\"logpath\":\"E:/nginx/logs/access.log\",\"topic\":\"nginx_log\"},{\"logpath\":\"E:/nginx/logs/error.log\",\"topic\":\"nginx_log_err\"}]" ]
2020/03/24 15:21:27.647 [D] 日志设置为[{E:/nginx/logs/access.log nginx_log} {E:/nginx/logs/error.log nginx_log_err}]
2020/03/24 15:21:27.648 [D] resp from etcd:[]
2020/03/24 15:21:27.648 [D] 连接etcd成功
2020/03/24 15:21:27.648 [D] 初始化etcd成功!
2020/03/24 15:21:27.648 [D] 初始化tailf成功!
2020/03/24 15:21:27.648 [D] 开始监控key: /backend/logagent/config/169.254.109.181
2020/03/24 15:21:27.648 [D] 开始监控key: /backend/logagent/config/192.168.0.1
2020/03/24 15:21:27.648 [D] 开始监控key: /backend/logagent/config/192.168.106.1
2020/03/24 15:21:27.650 [D] 开始监控key: /backend/logagent/config/169.254.30.148
2020/03/24 15:21:27.650 [D] 开始监控key: /backend/logagent/config/192.168.0.11
2020/03/24 15:21:27.651 [D] 开始监控key: /backend/logagent/config/169.254.153.68
2020/03/24 15:21:27.653 [D] 初始化Kafka producer成功,地址为: 0.0.0.0:9092
2020/03/24 15:21:27.653 [D] 初始化Kafka成功!

实现watch新的配置

修改etcd/etcd.go

etcd/etcd.go新增initEtcdWatcherwatchKey函数,用于监视etcdkey的变化,当有变化时通知

其中通过ev.Type判断是delect还是put处理

getConfSucc得到配置成功, 进行tailf中的UpdateConfig操作, 进行更新配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// 初始化多个watch监控etcd中配置节点
func initEtcdWatcher(addr string) {
for _, key := range etcdClient.keys {
go watchKey(addr, key)
}
}

func watchKey(addr string, key string) {

// 初始化连接etcd
cli, err := clientv3.New(clientv3.Config{
//Endpoints: []string{"localhost:2379", "localhost:22379", "localhost:32379"},
Endpoints: []string{addr},
DialTimeout: 5 * time.Second,
})
if err != nil {
logs.Error("连接etcd失败:", err)
return
}

logs.Debug("开始监控key:", key)

// Watch操作
for {
var collectConf []tailf.CollectConf
var getConfSucc = true
wch := cli.Watch(context.Background(), key)
for resp := range wch {
for _, ev := range resp.Events {
// DELETE处理
if ev.Type == mvccpb.DELETE {
logs.Warn("删除Key[%s]配置", key)
continue
}
// PUT处理
if ev.Type == mvccpb.PUT && string(ev.Kv.Key) == key {
err = json.Unmarshal(ev.Kv.Value, &collectConf)
if err != nil {
logs.Error("反序列化key[%s]失败:", err)
getConfSucc = false
continue
}
}
logs.Debug("get config from etcd ,Type: %v, Key:%v, Value:%v\n", ev.Type, string(ev.Kv.Key), string(ev.Kv.Value))
}
if getConfSucc {
logs.Debug("get config from etcd success, %v", collectConf)
_ = tailf.UpdateConfig(collectConf)
}
}
}
}

修改tailf/tail.go

TailObj结构体中添加status(当前配置状态)与exitChan(管道里有数值时即为退出)

1
2
3
4
5
6
type TailObj struct {
tail *tail.Tail
conf CollectConf
status int
exitChan chan int
}

1
2
3
4
5
// 定义常量
const (
StatusNormal = 1 // 正常状态
StatusDelete = 2 // 删除状态
)

UpdateConfig函数中遍历所有配置项

isRuning为false时, 即可认为此项配置不存在, 需要新建任务, 调用createNewTask函数

重新遍历所有配置项, 如果obj.status == StatusDelete时, 将obj.exitChan <- 1传入数值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// 新增etcd配置项
func UpdateConfig(confs []CollectConf) (err error) {
// 创建新的tailtask
for _, oneConf := range confs {
// 对于已经运行的所有实例, 路径是否一样
var isRuning = false
for _, obj := range tailObjMgr.tailsObjs {
// 路径一样则证明是同一实例
if oneConf.LogPath == obj.conf.LogPath {
isRuning = true
obj.status = StatusNormal
break
}
}

// 检查是否已经存在
if isRuning {
continue
}

// 如果不存在该配置项 新建一个tailtask任务
createNewTask(oneConf)
}

// 遍历所有查看是否存在删除操作
var tailObjs []*TailObj
for _, obj := range tailObjMgr.tailsObjs {
obj.status = StatusDelete
for _, oneConf := range confs {
if oneConf.LogPath == obj.conf.LogPath {
obj.status = StatusNormal
break
}
}
// 如果status为删除, 则将exitChan置为1
if obj.status == StatusDelete {
obj.exitChan <- 1
}
// 将obj存入临时的数组中
tailObjs = append(tailObjs, obj)
}
// 将临时数组传入tailsObjs中
tailObjMgr.tailsObjs = tailObjs
return
}

createNewTask函数为新建一个tailtask

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func createNewTask(conf CollectConf) {
// 初始化Tailf实例
tails, errTail := tail.TailFile(conf.LogPath, tail.Config{
ReOpen: true,
Follow: true,
Location: &tail.SeekInfo{Offset: 0, Whence: 2},
MustExist: false,
Poll: true,
})

if errTail != nil {
logs.Error("收集文件[%s]错误: %v", conf.LogPath, errTail)
return
}
// 导入配置项
obj := &TailObj{
conf: conf,
exitChan: make(chan int, 1),
}

obj.tail = tails
tailObjMgr.tailsObjs = append(tailObjMgr.tailsObjs, obj)

go readFromTail(obj)
}

修改readFromTail函数, 当向TextMsg即想Kafka传入数据时, 判断该tailObj.exitChan是否为退出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 读入日志数据
func readFromTail(tailObj *TailObj) {
for true {
select {

case msg, ok := <-tailObj.tail.Lines:
if !ok {
logs.Warn("Tail file close reopen, filename:%s\n", tailObj.tail.Filename)
time.Sleep(100 * time.Millisecond)
continue
}
textMsg := &TextMsg{
Msg: msg.Text,
Topic: tailObj.conf.Topic,
}
// 放入chan里
tailObjMgr.msgChan <- textMsg

// 如果exitChan为1, 则删除对应配置项
case <-tailObj.exitChan:
logs.Warn("tail obj 退出, 配置项为conf:%v", tailObj.conf)
return
}
}
}

最后修改InitTail函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 初始化tail
func InitTail(conf []CollectConf, chanSize int) (err error) {

tailObjMgr = &TailObjMgr{
msgChan: make(chan *TextMsg, chanSize), // 定义Chan管道
}

// 加载配置项
if len(conf) == 0 {
logs.Error("无效的日志collect配置: ", conf)
}

// 循环导入
for _, v := range conf {
createNewTask(v)
}

return
}

测试

1
2
3
4
5
6
go build
./logagent
[169.254.109.181 169.254.30.148 192.168.106.1 192.168.0.1 192.168.0.11 169.254.153.68]
开始
2020/03/25 11:32:57 Waiting for E:/nginx/logs/access.log to appear...
2020/03/25 11:32:57 Waiting for E:/nginx/logs/error.log to appear...

向etcd中进行PUTDELECE

delece操作时

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
2020/03/25 11:44:09.733 [D]  导入日志成功&{debug E:\\Go\\logagent\\logs\\my.log 100 0.0.0.0:9092 [{E:\\Go\\logagent\\logs\\my.log nginx_log}] 0.0.0.0:2379 /backend/logagent/config/}
2020/03/25 11:44:09.738 [D] resp from etcd:[]
2020/03/25 11:44:09.739 [D] resp from etcd:[]
2020/03/25 11:44:09.739 [D] resp from etcd:[]
2020/03/25 11:44:09.740 [D] resp from etcd:[]
2020/03/25 11:44:09.741 [D] resp from etcd:[key:"/backend/logagent/config/192.168.0.11" create_revision:6 mod_revision:10 version:5 value:"[{\"logpath\":\"E:/nginx/logs/access.log\",\"topic\":\"nginx_log\"},{\"logpath\":\"E:/nginx/logs/error.log\",\"topic\":\"nginx_log_err\"},{\"logpath\":\"E:/nginx/logs/error2.log\",\"topic\":\"nginx_log_err2\"}]" ]
2020/03/25 11:44:09.741 [D] 日志设置为[{E:/nginx/logs/access.log nginx_log} {E:/nginx/logs/error.log nginx_log_err} {E:/nginx/logs/error2.log nginx_log_err2}]
2020/03/25 11:44:09.741 [D] resp from etcd:[]
2020/03/25 11:44:09.741 [D] 连接etcd成功
2020/03/25 11:44:09.741 [D] 初始化etcd成功!
2020/03/25 11:44:09.741 [D] 初始化tailf成功!
2020/03/25 11:44:09.741 [D] 开始监控key: /backend/logagent/config/169.254.109.181
2020/03/25 11:44:09.741 [D] 开始监控key: /backend/logagent/config/169.254.30.148
2020/03/25 11:44:09.741 [D] 开始监控key: /backend/logagent/config/192.168.0.1
2020/03/25 11:44:09.742 [D] 开始监控key: /backend/logagent/config/192.168.0.11
2020/03/25 11:44:09.742 [D] 开始监控key: /backend/logagent/config/192.168.106.1
2020/03/25 11:44:09.742 [D] 开始监控key: /backend/logagent/config/169.254.153.68
2020/03/25 11:44:09.745 [D] 初始化Kafka producer成功,地址为: 0.0.0.0:9092
2020/03/25 11:44:09.745 [D] 初始化Kafka成功!
2020/03/25 11:44:30.820 [D] get config from etcd ,Type: PUT, Key:/backend/logagent/config/192.168.0.11, Value:[{"logpath":"E:/nginx/logs/access.log","topic":"nginx_log"},{"logpath":"E:/nginx/logs/error.log","topic":"nginx_log_err"}]

2020/03/25 11:44:30.820 [D] get config from etcd success, [{E:/nginx/logs/access.log nginx_log} {E:/nginx/logs/error.log nginx_log_err}]
2020/03/25 11:44:30.820 [W] tail obj 退出, 配置项为conf:{E:/nginx/logs/error2.log nginx_log_err2}

put操作

1
2
3
2020/03/25 11:46:18.082 [D]  get config from etcd ,Type: PUT, Key:/backend/logagent/config/192.168.0.11, Value:[{"logpath":"E:/nginx/logs/access.log","topic":"nginx_log"},{"logpath":"E:/nginx/logs/error.log","topic":"nginx_log_err"},{"logpath":"E:/nginx/logs/error2.log","topic":"nginx_log_err2"}]

2020/03/25 11:46:18.082 [D] get config from etcd success, [{E:/nginx/logs/access.log nginx_log} {E:/nginx/logs/error.log nginx_log_err} {E:/nginx/logs/error2.log nginx_log_err2}]

Kafka消费成功
在这里插入图片描述
已经成功获取到日志,

至此logAgent 的部分就写完了, 后续开发logTransfer将从Kafka里面获取日志信息入库。可以存Elasticsearch,供Kibana进行可视化查询, 最后通过beego开发web界面管理etcd

CATALOG
  1. 1. 监视etcd配置项的变更
    1. 1.1. 实现watch各个不同ip
    2. 1.2. 实现watch新的配置
      1. 1.2.1. 修改etcd/etcd.go
      2. 1.2.2. 修改tailf/tail.go
    3. 1.3. 测试