Just Do IT !

Golang实战之海量日志收集系统(三)简单版本logAgent的实现

字数统计: 2.3k阅读时长: 11 min
2020/03/29 Share

简单版本LogAgent的实现

这里主要是先实现核心的功能,后续再做优化和改进,主要实现能够根据配置文件中配置的日志路径去读取日志并将读取的实时推送到kafka消息队列中

关于logagent的主要结构如下:
在这里插入图片描述

.
├─conf
│      logagent.conf
│
├─kafka
│      kafka.go
│
├─logs
│      my.log
│
├─main
│      config.go
│      log.go
│      main.go
│      server.go
│
├─tailf
│      tail.go
│  go.mod
└─ go.sum

现在使用tail库能读取到日志,使用sarama库能到推送消息到kafka,我们结合这两个库,实现一边读取文件日志,一遍写入到kafka

logagent.conf :配置文件
my.log:产生的日志文件
config.go:用于初始化读取配置文件中的内容,这里的配置文件加载是通过之前自己实现的配置文件热加载包处理的
kafka.go:对kafka的操作,包括初始化kafka连接,以及给kafka发送消息
server.go:主要是tail 的相关操作,用于去读日志文件并将内容放到channel中
log.go:日志的处理与序列化
tail.go: 用于去读日志文件
main.go: 初始化入口文件,与执行server的入口函数

LogAgent的初步框架实现

现在使用tail库能读取到日志,使用sarama库能到推送消息到kafka,我们结合这两个库,实现一边读取文件日志,一遍写入到kafka

新建kafka/kafka.go和taillog/tail.go,分别先建立一个初始化函数

kafka/kafka.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package kafka

import (
"fmt"

"github.com/Shopify/sarama"
)

var (
client sarama.SyncProducer
)

func Init(addrs []string) (err error) {
config := sarama.NewConfig()

config.Producer.RequiredAcks = sarama.WaitForAll // 发送完数据需要leader和follow都确认
config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出⼀个partition
config.Producer.Return.Successes = true // 成功交付的消息将在success channel返回

// 连接kafka
client, err = sarama.NewSyncProducer([]string{addrs}, config)
if err != nil {
fmt.Println("producer closed, err:", err)
return
}
return
}

tail/tail.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package tail

import (
"fmt"

"github.com/hpcloud/tail"
)

var (
tailObj *tail.Tail
)

func Init(filename string) (err error) {
config := tail.Config{
ReOpen: true,
Follow: true,
Location: &tail.SeekInfo{Offset: 0, Whence: 2},
MustExist: false,
Poll: true}
tailObj, err = tail.TailFile(filename, config)
if err != nil {
fmt.Println("tail file failed, err:", err)
return
}
return
}

main.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package main

import (
"fmt"
"logAgent/kafka"
"logAgent/taillog"
)

func main() {
// 1.初始化kafka
err := kafka.Init([]string{"127.0.0.1:29092"})
if err != nil {
fmt.Printf("init kafka failed ,err:%v\n", err)
return
}
fmt.Println("init kafka success")
// 2.初始化taillog
err = taillog.Init("./my.log")
if err != nil {
fmt.Printf("init taillog failed, err:%v\n", err)
return
}
fmt.Println("init taillog success")
}

都初始化之后,就是怎么将日志发给kafka了

tail/tail.go中创建一个ReadChan函数

1
2
3
func ReadChan() <-chan *tail.Line {
return tailObj.Lines
}

kafka/kafka.go中创建一个SendToKafka的函数,该函数接收从外部提供的topic和data参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func SendToKafka(topic, data string) {
// 构造⼀个消息
msg := &sarama.ProducerMessage{}
msg.Topic = topic
msg.Value = sarama.StringEncoder(data)

// 发送消息
pid, offset, err := client.SendMessage(msg)
if err != nil {
fmt.Printf("send msg failed, err: %v\n", err)
return
}
fmt.Printf("pid:%v offset:%v\n", pid, offset)
}

在main.go中创建run函数,执行具体的任务,并在main函数中调用它

1
2
3
4
5
6
7
8
9
10
func run() {
for {
select {
case line := <-taillog.ReadChan():
kafka.SendToKafka("web_log", line.Text)
default:
time.Sleep(time.Millisecond * 500)
}
}
}

往my.log中写入一点数据进行测试

LogAgent的初步框架改进

通过github.com/astaxie/beego/logs解析配置文件, 将所有的配置信息写入logagent.conf

logagent.conf

1
2
3
4
5
6
7
8
9
10
11
[logs]
log_level = debug
log_path = E:\\Go\\logagent\\logs\\my.log

[collect]
log_path = E:\\Go\\logagent\\logs\\my.log
topic = nginx_log
chan_size = 100

[kafka]
server_addr = 0.0.0.0:9092

引入完整代码:

main.go

主要功能是初始化配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package main

import (
"fmt"
"github.com/astaxie/beego/logs"
"logagent/kafka"
"logagent/tailf"
)

func main() {

fmt.Println("开始")
// 读取初始化配置文件
filename := "E:\\Go\\logagent\\conf\\logagent.conf"
err := loadInitConf("ini", filename)
if err != nil {
fmt.Printf("导入配置文件错误:%v\n", err)
panic("导入配置文件错误")
return
}

// 初始化日志信息
err = initLogger()
if err != nil {
fmt.Printf("导入日志文件错误:%v\n", err)
panic("导入日志文件错误")
return
}
// 输出成功信息
logs.Debug("导入日志成功%v", logConfig)

// 初始化tailf
err = tailf.InitTail(logConfig.CollectConf, logConfig.chanSize)
if err != nil {
logs.Error("初始化tailf失败:", err)
return
}
logs.Debug("初始化tailf成功!")

// 初始化Kafka
err = kafka.InitKafka(logConfig.KafkaAddr)
if err != nil {
logs.Error("初识化kafka producer失败:", err)
return
}
logs.Debug("初始化Kafka成功!")

// 运行
err = serverRun()
if err != nil {
logs.Error("serverRun failed:", err)
}
logs.Info("程序退出")
}

config.go
导入logagent.conf的配置信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package main

import (
"errors"
"fmt"
"github.com/astaxie/beego/config"
"logagent/tailf"
)

var (
logConfig *Config
)

// 日志配置
type Config struct {
logLevel string
logPath string
chanSize int
KafkaAddr string
CollectConf []tailf.CollectConf
}

// 日志收集配置
func loadCollectConf(conf config.Configer) (err error) {
var c tailf.CollectConf

c.LogPath = conf.String("collect::log_path")
if len(c.LogPath) == 0 {
err = errors.New("无效的 collect::log_path ")
return
}

c.Topic = conf.String("collect::topic")
if len(c.Topic) == 0 {
err = errors.New("无效的 collect::topic ")
return
}

logConfig.CollectConf = append(logConfig.CollectConf, c)
return
}

// 导入初始化配置
func loadInitConf(confType, filename string) (err error) {
conf, err := config.NewConfig(confType, filename)
if err != nil {
fmt.Printf("初始化配置文件出错:%v\n", err)
return
}
// 导入配置信息
logConfig = &Config{}
// 日志级别
logConfig.logLevel = conf.String("logs::log_level")
if len(logConfig.logLevel) == 0 {
logConfig.logLevel = "debug"
}
// 日志输出路径
logConfig.logPath = conf.String("logs::log_path")
if len(logConfig.logPath) == 0 {
logConfig.logPath = "E:\\Go\\logagent\\logs\\my.log"
}

// 管道大小
logConfig.chanSize, err = conf.Int("collect::chan_size")
if err != nil {
logConfig.chanSize = 100
}

// Kafka
logConfig.KafkaAddr = conf.String("kafka::server_addr")
if len(logConfig.KafkaAddr) == 0 {
err = fmt.Errorf("初识化Kafka失败")
return
}

err = loadCollectConf(conf)
if err != nil {
fmt.Printf("导入日志收集配置错误:%v", err)
return
}
return
}

log.go

解析日志

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package main

import (
"encoding/json"
"fmt"
"github.com/astaxie/beego/logs"
)

func convertLogLevel(level string) int {

switch level {
case "debug":
return logs.LevelDebug
case "warn":
return logs.LevelWarn
case "info":
return logs.LevelInfo
case "trace":
return logs.LevelTrace
}
return logs.LevelDebug
}

func initLogger() (err error) {

config := make(map[string]interface{})
config["filename"] = logConfig.logPath
config["level"] = convertLogLevel(logConfig.logLevel)
configStr, err := json.Marshal(config)
if err != nil {
fmt.Println("初始化日志, 序列化失败:", err)
return
}
_ = logs.SetLogger(logs.AdapterFile, string(configStr))

return
}

tail.go

定义TailObjMgr结构体, 将tail监控到的配置消息通过tailObjMgr.msgChan <- textMsg放入管道中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package tailf

import (
"fmt"
"github.com/astaxie/beego/logs"
"github.com/hpcloud/tail"
"time"
)

// 将日志收集配置放在tailf包下,方便其他包引用
type CollectConf struct {
LogPath string
Topic string
}

// 存入Collect
type TailObj struct {
tail *tail.Tail
conf CollectConf
}

// 定义Message信息
type TextMsg struct {
Msg string
Topic string
}

// 管理系统所有tail对象
type TailObjMgr struct {
tailsObjs []*TailObj
msgChan chan *TextMsg
}

// 定义全局变量
var (
tailObjMgr *TailObjMgr
)

func GetOneLine() (msg *TextMsg) {
msg = <- tailObjMgr.msgChan
return
}

func InitTail(conf []CollectConf, chanSize int) (err error) {

// 加载配置项
if len(conf) == 0 {
err = fmt.Errorf("无效的log collect conf:%v", conf)
return
}
tailObjMgr = &TailObjMgr{
msgChan: make(chan *TextMsg, chanSize), // 定义Chan管道
}
// 循环导入
for _, v := range conf {
// 初始化Tail
fmt.Println(v)
tails, errTail := tail.TailFile(v.LogPath, tail.Config{
ReOpen: true,
Follow: true,
Location: &tail.SeekInfo{Offset: 0, Whence: 2},
MustExist: false,
Poll: true,
})
if errTail != nil {
err = errTail
fmt.Println("tail 操作文件错误:", err)
return
}
// 导入配置项
obj := &TailObj{
conf: v,
tail: tails,
}

tailObjMgr.tailsObjs = append(tailObjMgr.tailsObjs, obj)

go readFromTail(obj)
}

return
}

// 读入日志数据
func readFromTail(tailObj *TailObj) {
for true {
msg, ok := <-tailObj.tail.Lines
if !ok {
logs.Warn("Tail file close reopen, filename:%s\n", tailObj.tail.Filename)
time.Sleep(100 * time.Millisecond)
continue
}

textMsg := &TextMsg{
Msg: msg.Text,
Topic: tailObj.conf.Topic,
}

// 放入chan里面
tailObjMgr.msgChan <- textMsg
}
}

server.go
在server.go中添加了sendToKafka函数, 该函数作用是取出tail.go文件中放入管道中的msg

并且调用kafka包中kafka.goSendToKafka函数发送消息到Kafka中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package main

import (
"github.com/astaxie/beego/logs"
"logagent/kafka"
"logagent/tailf"
"time"
)

func serverRun() (err error) {

for {
msg := tailf.GetOneLine()
err = sendToKafka(msg)
if err != nil {
logs.Error("发送消息到Kafka 失败, err:%v", err)
time.Sleep(time.Second)
continue
}
}

}

func sendToKafka(msg *tailf.TextMsg) (err error) {
//fmt.Printf("读取 msg:%s, topic:%s\n", msg.Msg, msg.Topic) // 将消息打印在终端
_ = kafka.SendToKafka(msg.Msg, msg.Topic)
return
}

kafka.go

定义了初始化kafka函数InitKafka与发送消息到Kafka的函数SendToKafka

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package kafka

import (
"github.com/Shopify/sarama"
"github.com/astaxie/beego/logs"
)

var (
client sarama.SyncProducer
)

func InitKafka(addr string) (err error) {

// Kafka生产者配置
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll // 发送完数据需要leader和follow都确认
config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出⼀个partition
config.Producer.Return.Successes = true // 成功交付的消息将在success channel返回

// 新建一个生产者对象
client, err = sarama.NewSyncProducer([]string{addr}, config)
if err != nil {
logs.Error("初识化Kafka producer失败:", err)
return
}
logs.Debug("初始化Kafka producer成功,地址为:", addr)
return
}

func SendToKafka(data, topic string) (err error) {

msg := &sarama.ProducerMessage{}
msg.Topic = topic
msg.Value = sarama.StringEncoder(data)

pid, offset, err := client.SendMessage(msg)

if err != nil {
logs.Error("发送信息失败, err:%v, data:%v, topic:%v", err, data, topic)
return
}

logs.Debug("read success, pid:%v, offset:%v, topic:%v\n", pid, offset, topic)
return
}

开发环境:

我这里的环境是Go1.14, 使用了Go module模块, 所以想要快速运行该项目需要在项目文件夹下 go mod init, 运行时自动下载依赖

运行main函数:

E:\Go\logagent\main>go build

注: 如果想使用Goland直接运行,这里需要同时运行main包下的四个go文件
在这里插入图片描述

运行完如图:kafka消费成功, 写入my.log成功
在这里插入图片描述
在这里插入图片描述

CATALOG
  1. 1. 简单版本LogAgent的实现
  2. 2. LogAgent的初步框架实现
  3. 3. LogAgent的初步框架改进