Just Do IT !

Golang实战之海量日志收集系统(五)根据etcd配置项创建多个tailTask

字数统计: 1.3k阅读时长: 7 min
2020/03/29 Share

通过上一篇从etcd中获取配置信息, 现在要拿着这些配置项进行日志收集

根据etcd的配置项创建多个tailtask

项目结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
.

│ go.mod
│ go.sum


├─conf
│ logagent.conf

├─kafka
│ kafka.go

├─logs
│ my.log

├─main
│ config.go
│ etcd.go
│ ip.go
│ log.go
│ main.go
│ server.go

├─tailf
│ tail.go

└─tools
└─SetConf
main.go

新增tools包, 包含SetConf里的main.go将设置的配置信息导入到etcd中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package main

import (
"context"
"encoding/json"
"fmt"
"github.com/coreos/etcd/clientv3"
"logagent/tailf"
"time"
)

// 定义etcd的前缀key
const (
EtcdKey = "/backend/logagent/config/192.168.0.11"
)

func SetLogConfToEtcd() {
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"localhost:2379", "localhost:22379", "localhost:32379"},
DialTimeout: 5 * time.Second,
})
if err != nil {
fmt.Println("connect failed, err:", err)
return
}

fmt.Println("connect succ")
defer cli.Close()

var logConfArr []tailf.CollectConf
logConfArr = append(
logConfArr,
tailf.CollectConf{
LogPath: "E:/nginx/logs/access.log",
Topic: "nginx_log",
},
)
logConfArr = append(
logConfArr,
tailf.CollectConf{
LogPath: "E:/nginx/logs/error.log",
Topic: "nginx_log_err",
},
)

// Json打包
data, err := json.Marshal(logConfArr)
if err != nil {
fmt.Println("json failed, ", err)
return
}

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
_, err = cli.Put(ctx, EtcdKey, string(data))
cancel()
if err != nil {
fmt.Println("put failed, err:", err)
return
}

ctx, cancel = context.WithTimeout(context.Background(), time.Second)
resp, err := cli.Get(ctx, EtcdKey)
cancel()
if err != nil {
fmt.Println("get failed, err:", err)
return
}
for _, ev := range resp.Kvs {
fmt.Printf("%s : %s\n", ev.Key, ev.Value)
}
}

func main() {
SetLogConfToEtcd()
}

直接运行文件

connect succ
/backend/logagent/config/192.168.0.11 : [{"path":"E:/nginx/logs/access.log","topic":"nginx_log"},{"path":"E:/nginx/logs/error.log","topic":"nginx_log_err"}]

成功将配置信息导入到etcd中, 下一步需要根据etcd中的配置项, 创建多个tailtask
我们想要实现的是

E:/nginx/logs/access.log 日志文件发送到kafka的nginx_log中
E:/nginx/logs/error.log 日志文件发送到kafka的nginx_log_err中

main包中新建etcd.go文件

用于初始化连接etcd与从etcd中取出配置信息

因为存入etcd时传入的是json格式,所以取出使用时需要反序列化json.Unmarshal(v.Value, &collectConf)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package main

import (
"context"
"encoding/json"
"fmt"
"github.com/astaxie/beego/logs"
"github.com/coreos/etcd/clientv3"
"logagent/tailf"
"strings"
"time"
)

type EtcdClient struct {
client *clientv3.Client
}

var (
etcdClient *EtcdClient
)

func initEtcd(addr string, key string) (collectConf []tailf.CollectConf, err error) {
// 初始化连接etcd
cli, err := clientv3.New(clientv3.Config{
//Endpoints: []string{"localhost:2379", "localhost:22379", "localhost:32379"},
Endpoints: []string{addr},
DialTimeout: 5 * time.Second,
})
if err != nil {
logs.Error("连接etcd失败:", err)
return
}

etcdClient = &EtcdClient{
client: cli,
}

// 如果Key不是以"/"结尾, 则自动加上"/"
if strings.HasSuffix(key, "/") == false {
key = key + "/"
}

for _, ip := range localIPArray {
etcdKey := fmt.Sprintf("%s%s", key, ip)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
resp, err := cli.Get(ctx, etcdKey)
if err != nil {
logs.Error("etcd get请求失败:", err)
continue
}
cancel()
logs.Debug("resp from etcd:%v", resp.Kvs)
for _, v := range resp.Kvs {
if string(v.Key) == etcdKey {
// 将从etcd中取出来的json格式反序列化为结构体
err = json.Unmarshal(v.Value, &collectConf)
if err != nil {
logs.Error("反序列化失败:", err)
continue
}
logs.Debug("日志设置为%v", collectConf)
}
}
}

logs.Debug("连接etcd成功")
return
}

修改logagent.conf配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
[logs]
log_level = debug
log_path = E:\\Go\\logagent\\logs\\my.log

[collect]
log_path = E:\\Go\\logagent\\logs\\my.log
topic = nginx_log
chan_size = 100

[kafka]
server_addr = 0.0.0.0:9092

[etcd]
addr = 0.0.0.0:2379
configKey= /backend/logagent/config/

main包中新建ip.go文件

使用net包取出本机所有的网卡ip去连接etcd

考虑到以后添加新服务器时不需要手动添加ip,这里将ip信息全部存入localIPArray数组中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package main

import (
"fmt"
"net"
)

var (
localIPArray []string
)

func init() {
addrs, err := net.InterfaceAddrs()
if err != nil {
panic(fmt.Sprintf("获取网卡ip失败, %v", err))
}
for _, addr := range addrs {
if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
if ipnet.IP.To4() != nil {
localIPArray = append(localIPArray, ipnet.IP.String())
}
}
}

fmt.Println(localIPArray)
}

config.go加入etcd配置

1
2
3
4
5
6
7
8
9
10
11
12
// etcd
logConfig.etcdAddr = conf.String("etcd::addr")
if len(logConfig.etcdAddr) == 0 {
err = fmt.Errorf("初识化etcd addr失败")
return
}

logConfig.etcdKey = conf.String("etcd::configKey")
if len(logConfig.etcdKey) == 0 {
err = fmt.Errorf("初识化etcd configKey失败")
return
}
1
2
3
4
5
6
7
8
9
10
11
12
// 日志配置
type Config struct {
logLevel string
logPath string

chanSize int
KafkaAddr string
collectConf []tailf.CollectConf

etcdAddr string
etcdKey string
}

tail.go修改序列化

1
2
3
4
type CollectConf struct {
LogPath string `json:"logpath"`
Topic string `json:"topic"`
}

main.go函数

initEtcd初始化etcd函数放到InitTail函数之前, 使etcd中的配置项与tailf连接起来

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package main

import (
"fmt"
"github.com/astaxie/beego/logs"
"logagent/kafka"
"logagent/tailf"
)

func main() {

fmt.Println("开始")
// 读取初始化配置文件
filename := "E:\\Go\\logagent\\conf\\logagent.conf"
err := loadInitConf("ini", filename)
if err != nil {
fmt.Printf("导入配置文件错误:%v\n", err)
panic("导入配置文件错误")
return
}

// 初始化日志信息
err = initLogger()
if err != nil {
fmt.Printf("导入日志文件错误:%v\n", err)
panic("导入日志文件错误")
return
}
// 输出成功信息
logs.Debug("导入日志成功%v", logConfig)

// 初识化etcd
collectConf, err := initEtcd(logConfig.etcdAddr, logConfig.etcdKey)
if err != nil {
logs.Error("初始化etcd失败",err)
}
logs.Debug("初始化etcd成功!")

// 初始化tailf
err = tailf.InitTail(collectConf, logConfig.chanSize)
if err != nil {
logs.Error("初始化tailf失败:", err)
return
}
logs.Debug("初始化tailf成功!")

// 初始化Kafka
err = kafka.InitKafka(logConfig.KafkaAddr)
if err != nil {
logs.Error("初识化Kafka producer失败:", err)
return
}
logs.Debug("初始化Kafka成功!")

// 运行
err = serverRun()
if err != nil {
logs.Error("serverRun failed:", err)
}
logs.Info("程序退出")
}

运行, 成功将etcd中的配置项经过tailf得到配置, 创建了多个tailtask

1
2
3
4
5
6
[169.254.109.181 169.254.30.148 192.168.106.1 192.168.0.1 192.168.0.11 169.254.153.68]
开始
{E:/nginx/logs/access.log nginx_log}
{E:/nginx/logs/error.log nginx_log_err}
2020/03/23 18:14:44 Waiting for E:/nginx/logs/access.log to appear...
2020/03/23 18:14:44 Waiting for E:/nginx/logs/error.log to appear...

CATALOG
  1. 1. 根据etcd的配置项创建多个tailtask
    1. 1.1. 新增tools包, 包含SetConf里的main.go将设置的配置信息导入到etcd中
    2. 1.2. main包中新建etcd.go文件
    3. 1.3. 修改logagent.conf配置文件
    4. 1.4. main包中新建ip.go文件
    5. 1.5. config.go加入etcd配置
    6. 1.6. tail.go修改序列化
    7. 1.7. main.go函数