在上一篇中我们已经实现了从etcd
中获取配置信息并创建tailTask
任务
现在我们来通过etcd
的watch
实现新配置的变更
监视etcd配置项的变更
实现watch各个不同ip
在真实生产环境中时会常常添加新的服务器, 这时我们需要借助之前的ip.go
获取所有ip节点, 并且实时监控
修改EtcdClient
结构体增加keys
1
2
3
4type EtcdClient struct {
client *clientv3.Client
keys []string
}
在main/etcd.go中添加initEtcdWatcher
与watchKey
函数并且在函数initEtcd
中调用
1 | // 初始化多个watch监控etcd中配置节点 |
1 | 2020/03/24 15:21:27.632 [D] 导入日志成功&{debug E:\\Go\\logagent\\logs\\my.log 100 0.0.0.0:9092 [{E:\\Go\\logagent\\logs\\my.log nginx_log}] 0.0.0.0:2379 /backend/logagent/config/} |
实现watch新的配置
修改etcd/etcd.go
etcd/etcd.go
新增initEtcdWatcher
与watchKey
函数,用于监视etcd
的key
的变化,当有变化时通知
其中通过ev.Type
判断是delect
还是put
处理
当getConfSucc
得到配置成功, 进行tailf中的UpdateConfig
操作, 进行更新配置1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52// 初始化多个watch监控etcd中配置节点
func initEtcdWatcher(addr string) {
for _, key := range etcdClient.keys {
go watchKey(addr, key)
}
}
func watchKey(addr string, key string) {
// 初始化连接etcd
cli, err := clientv3.New(clientv3.Config{
//Endpoints: []string{"localhost:2379", "localhost:22379", "localhost:32379"},
Endpoints: []string{addr},
DialTimeout: 5 * time.Second,
})
if err != nil {
logs.Error("连接etcd失败:", err)
return
}
logs.Debug("开始监控key:", key)
// Watch操作
for {
var collectConf []tailf.CollectConf
var getConfSucc = true
wch := cli.Watch(context.Background(), key)
for resp := range wch {
for _, ev := range resp.Events {
// DELETE处理
if ev.Type == mvccpb.DELETE {
logs.Warn("删除Key[%s]配置", key)
continue
}
// PUT处理
if ev.Type == mvccpb.PUT && string(ev.Kv.Key) == key {
err = json.Unmarshal(ev.Kv.Value, &collectConf)
if err != nil {
logs.Error("反序列化key[%s]失败:", err)
getConfSucc = false
continue
}
}
logs.Debug("get config from etcd ,Type: %v, Key:%v, Value:%v\n", ev.Type, string(ev.Kv.Key), string(ev.Kv.Value))
}
if getConfSucc {
logs.Debug("get config from etcd success, %v", collectConf)
_ = tailf.UpdateConfig(collectConf)
}
}
}
}
修改tailf/tail.go
在TailObj
结构体中添加status
(当前配置状态)与exitChan
(管道里有数值时即为退出)1
2
3
4
5
6type TailObj struct {
tail *tail.Tail
conf CollectConf
status int
exitChan chan int
}
1 | // 定义常量 |
在UpdateConfig
函数中遍历所有配置项
当isRuning
为false时, 即可认为此项配置不存在, 需要新建任务, 调用createNewTask
函数
重新遍历所有配置项, 如果obj.status == StatusDelete
时, 将obj.exitChan <- 1
传入数值
1 | // 新增etcd配置项 |
createNewTask
函数为新建一个tailtask1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25func createNewTask(conf CollectConf) {
// 初始化Tailf实例
tails, errTail := tail.TailFile(conf.LogPath, tail.Config{
ReOpen: true,
Follow: true,
Location: &tail.SeekInfo{Offset: 0, Whence: 2},
MustExist: false,
Poll: true,
})
if errTail != nil {
logs.Error("收集文件[%s]错误: %v", conf.LogPath, errTail)
return
}
// 导入配置项
obj := &TailObj{
conf: conf,
exitChan: make(chan int, 1),
}
obj.tail = tails
tailObjMgr.tailsObjs = append(tailObjMgr.tailsObjs, obj)
go readFromTail(obj)
}
修改readFromTail
函数, 当向TextMsg
即想Kafka传入数据时, 判断该tailObj.exitChan
是否为退出1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25// 读入日志数据
func readFromTail(tailObj *TailObj) {
for true {
select {
case msg, ok := <-tailObj.tail.Lines:
if !ok {
logs.Warn("Tail file close reopen, filename:%s\n", tailObj.tail.Filename)
time.Sleep(100 * time.Millisecond)
continue
}
textMsg := &TextMsg{
Msg: msg.Text,
Topic: tailObj.conf.Topic,
}
// 放入chan里
tailObjMgr.msgChan <- textMsg
// 如果exitChan为1, 则删除对应配置项
case <-tailObj.exitChan:
logs.Warn("tail obj 退出, 配置项为conf:%v", tailObj.conf)
return
}
}
}
最后修改InitTail
函数1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19// 初始化tail
func InitTail(conf []CollectConf, chanSize int) (err error) {
tailObjMgr = &TailObjMgr{
msgChan: make(chan *TextMsg, chanSize), // 定义Chan管道
}
// 加载配置项
if len(conf) == 0 {
logs.Error("无效的日志collect配置: ", conf)
}
// 循环导入
for _, v := range conf {
createNewTask(v)
}
return
}
测试
1 | go build |
向etcd中进行PUT
与DELECE
delece
操作时
1 | 2020/03/25 11:44:09.733 [D] 导入日志成功&{debug E:\\Go\\logagent\\logs\\my.log 100 0.0.0.0:9092 [{E:\\Go\\logagent\\logs\\my.log nginx_log}] 0.0.0.0:2379 /backend/logagent/config/} |
put
操作1
2
32020/03/25 11:46:18.082 [D] get config from etcd ,Type: PUT, Key:/backend/logagent/config/192.168.0.11, Value:[{"logpath":"E:/nginx/logs/access.log","topic":"nginx_log"},{"logpath":"E:/nginx/logs/error.log","topic":"nginx_log_err"},{"logpath":"E:/nginx/logs/error2.log","topic":"nginx_log_err2"}]
2020/03/25 11:46:18.082 [D] get config from etcd success, [{E:/nginx/logs/access.log nginx_log} {E:/nginx/logs/error.log nginx_log_err} {E:/nginx/logs/error2.log nginx_log_err2}]
Kafka
消费成功
已经成功获取到日志,
至此logAgent
的部分就写完了, 后续开发logTransfe
r将从Kafka
里面获取日志信息入库。可以存Elasticsearch
,供Kibana
进行可视化查询, 最后通过beego
开发web
界面管理etcd