简单版本LogAgent的实现
这里主要是先实现核心的功能,后续再做优化和改进,主要实现能够根据配置文件中配置的日志路径去读取日志并将读取的实时推送到kafka消息队列中
关于logagent的主要结构如下:

.
├─conf
│ logagent.conf
│
├─kafka
│ kafka.go
│
├─logs
│ my.log
│
├─main
│ config.go
│ log.go
│ main.go
│ server.go
│
├─tailf
│ tail.go
│ go.mod
└─ go.sum
现在使用tail
库能读取到日志,使用sarama
库能到推送消息到kafka
,我们结合这两个库,实现一边读取文件日志,一遍写入到kafka
logagent.conf :配置文件
my.log:产生的日志文件
config.go:用于初始化读取配置文件中的内容,这里的配置文件加载是通过之前自己实现的配置文件热加载包处理的
kafka.go:对kafka的操作,包括初始化kafka连接,以及给kafka发送消息
server.go:主要是tail 的相关操作,用于去读日志文件并将内容放到channel中
log.go:日志的处理与序列化
tail.go: 用于去读日志文件
main.go: 初始化入口文件,与执行server的入口函数
LogAgent的初步框架实现
现在使用tail库能读取到日志,使用sarama库能到推送消息到kafka,我们结合这两个库,实现一边读取文件日志,一遍写入到kafka
新建kafka/kafka.go和taillog/tail.go,分别先建立一个初始化函数
kafka/kafka.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| package kafka
import ( "fmt"
"github.com/Shopify/sarama" )
var ( client sarama.SyncProducer )
func Init(addrs []string) (err error) { config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Partitioner = sarama.NewRandomPartitioner config.Producer.Return.Successes = true
client, err = sarama.NewSyncProducer([]string{addrs}, config) if err != nil { fmt.Println("producer closed, err:", err) return } return }
|
tail/tail.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| package tail
import ( "fmt"
"github.com/hpcloud/tail" )
var ( tailObj *tail.Tail )
func Init(filename string) (err error) { config := tail.Config{ ReOpen: true, Follow: true, Location: &tail.SeekInfo{Offset: 0, Whence: 2}, MustExist: false, Poll: true} tailObj, err = tail.TailFile(filename, config) if err != nil { fmt.Println("tail file failed, err:", err) return } return }
|
main.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| package main
import ( "fmt" "logAgent/kafka" "logAgent/taillog" )
func main() { err := kafka.Init([]string{"127.0.0.1:29092"}) if err != nil { fmt.Printf("init kafka failed ,err:%v\n", err) return } fmt.Println("init kafka success") err = taillog.Init("./my.log") if err != nil { fmt.Printf("init taillog failed, err:%v\n", err) return } fmt.Println("init taillog success") }
|
都初始化之后,就是怎么将日志发给kafka了
在tail/tail.go
中创建一个ReadChan函数
1 2 3
| func ReadChan() <-chan *tail.Line { return tailObj.Lines }
|
在kafka/kafka.go
中创建一个SendToKafka
的函数,该函数接收从外部提供的topic和data参数
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| func SendToKafka(topic, data string) { msg := &sarama.ProducerMessage{} msg.Topic = topic msg.Value = sarama.StringEncoder(data)
pid, offset, err := client.SendMessage(msg) if err != nil { fmt.Printf("send msg failed, err: %v\n", err) return } fmt.Printf("pid:%v offset:%v\n", pid, offset) }
|
在main.go中创建run函数,执行具体的任务,并在main函数中调用它
1 2 3 4 5 6 7 8 9 10
| func run() { for { select { case line := <-taillog.ReadChan(): kafka.SendToKafka("web_log", line.Text) default: time.Sleep(time.Millisecond * 500) } } }
|
往my.log中写入一点数据进行测试
LogAgent的初步框架改进
通过github.com/astaxie/beego/logs
解析配置文件, 将所有的配置信息写入logagent.conf
中
logagent.conf
1 2 3 4 5 6 7 8 9 10 11
| [logs] log_level = debug log_path = E:\\Go\\logagent\\logs\\my.log
[collect] log_path = E:\\Go\\logagent\\logs\\my.log topic = nginx_log chan_size = 100
[kafka] server_addr = 0.0.0.0:9092
|
引入完整代码:
main.go
主要功能是初始化配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
| package main
import ( "fmt" "github.com/astaxie/beego/logs" "logagent/kafka" "logagent/tailf" )
func main() {
fmt.Println("开始") filename := "E:\\Go\\logagent\\conf\\logagent.conf" err := loadInitConf("ini", filename) if err != nil { fmt.Printf("导入配置文件错误:%v\n", err) panic("导入配置文件错误") return }
err = initLogger() if err != nil { fmt.Printf("导入日志文件错误:%v\n", err) panic("导入日志文件错误") return } logs.Debug("导入日志成功%v", logConfig)
err = tailf.InitTail(logConfig.CollectConf, logConfig.chanSize) if err != nil { logs.Error("初始化tailf失败:", err) return } logs.Debug("初始化tailf成功!")
err = kafka.InitKafka(logConfig.KafkaAddr) if err != nil { logs.Error("初识化kafka producer失败:", err) return } logs.Debug("初始化Kafka成功!")
err = serverRun() if err != nil { logs.Error("serverRun failed:", err) } logs.Info("程序退出") }
|
config.go
导入logagent.conf
的配置信息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
| package main
import ( "errors" "fmt" "github.com/astaxie/beego/config" "logagent/tailf" )
var ( logConfig *Config )
type Config struct { logLevel string logPath string chanSize int KafkaAddr string CollectConf []tailf.CollectConf }
func loadCollectConf(conf config.Configer) (err error) { var c tailf.CollectConf
c.LogPath = conf.String("collect::log_path") if len(c.LogPath) == 0 { err = errors.New("无效的 collect::log_path ") return }
c.Topic = conf.String("collect::topic") if len(c.Topic) == 0 { err = errors.New("无效的 collect::topic ") return }
logConfig.CollectConf = append(logConfig.CollectConf, c) return }
func loadInitConf(confType, filename string) (err error) { conf, err := config.NewConfig(confType, filename) if err != nil { fmt.Printf("初始化配置文件出错:%v\n", err) return } logConfig = &Config{} logConfig.logLevel = conf.String("logs::log_level") if len(logConfig.logLevel) == 0 { logConfig.logLevel = "debug" } logConfig.logPath = conf.String("logs::log_path") if len(logConfig.logPath) == 0 { logConfig.logPath = "E:\\Go\\logagent\\logs\\my.log" }
logConfig.chanSize, err = conf.Int("collect::chan_size") if err != nil { logConfig.chanSize = 100 }
logConfig.KafkaAddr = conf.String("kafka::server_addr") if len(logConfig.KafkaAddr) == 0 { err = fmt.Errorf("初识化Kafka失败") return }
err = loadCollectConf(conf) if err != nil { fmt.Printf("导入日志收集配置错误:%v", err) return } return }
|
log.go
解析日志
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| package main
import ( "encoding/json" "fmt" "github.com/astaxie/beego/logs" )
func convertLogLevel(level string) int {
switch level { case "debug": return logs.LevelDebug case "warn": return logs.LevelWarn case "info": return logs.LevelInfo case "trace": return logs.LevelTrace } return logs.LevelDebug }
func initLogger() (err error) {
config := make(map[string]interface{}) config["filename"] = logConfig.logPath config["level"] = convertLogLevel(logConfig.logLevel) configStr, err := json.Marshal(config) if err != nil { fmt.Println("初始化日志, 序列化失败:", err) return } _ = logs.SetLogger(logs.AdapterFile, string(configStr))
return }
|
tail.go
定义TailObjMgr
结构体, 将tail监控到的配置消息通过tailObjMgr.msgChan <- textMsg
放入管道中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102
| package tailf
import ( "fmt" "github.com/astaxie/beego/logs" "github.com/hpcloud/tail" "time" )
type CollectConf struct { LogPath string Topic string }
type TailObj struct { tail *tail.Tail conf CollectConf }
type TextMsg struct { Msg string Topic string }
type TailObjMgr struct { tailsObjs []*TailObj msgChan chan *TextMsg }
var ( tailObjMgr *TailObjMgr )
func GetOneLine() (msg *TextMsg) { msg = <- tailObjMgr.msgChan return }
func InitTail(conf []CollectConf, chanSize int) (err error) {
if len(conf) == 0 { err = fmt.Errorf("无效的log collect conf:%v", conf) return } tailObjMgr = &TailObjMgr{ msgChan: make(chan *TextMsg, chanSize), } for _, v := range conf { fmt.Println(v) tails, errTail := tail.TailFile(v.LogPath, tail.Config{ ReOpen: true, Follow: true, Location: &tail.SeekInfo{Offset: 0, Whence: 2}, MustExist: false, Poll: true, }) if errTail != nil { err = errTail fmt.Println("tail 操作文件错误:", err) return } obj := &TailObj{ conf: v, tail: tails, }
tailObjMgr.tailsObjs = append(tailObjMgr.tailsObjs, obj)
go readFromTail(obj) }
return }
func readFromTail(tailObj *TailObj) { for true { msg, ok := <-tailObj.tail.Lines if !ok { logs.Warn("Tail file close reopen, filename:%s\n", tailObj.tail.Filename) time.Sleep(100 * time.Millisecond) continue }
textMsg := &TextMsg{ Msg: msg.Text, Topic: tailObj.conf.Topic, }
tailObjMgr.msgChan <- textMsg } }
|
server.go
在server.go中添加了sendToKafka
函数, 该函数作用是取出tail.go
文件中放入管道中的msg
并且调用kafka
包中kafka.go
的SendToKafka
函数发送消息到Kafka中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| package main
import ( "github.com/astaxie/beego/logs" "logagent/kafka" "logagent/tailf" "time" )
func serverRun() (err error) {
for { msg := tailf.GetOneLine() err = sendToKafka(msg) if err != nil { logs.Error("发送消息到Kafka 失败, err:%v", err) time.Sleep(time.Second) continue } }
}
func sendToKafka(msg *tailf.TextMsg) (err error) { _ = kafka.SendToKafka(msg.Msg, msg.Topic) return }
|
kafka.go
定义了初始化kafka函数InitKafka
与发送消息到Kafka的函数SendToKafka
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| package kafka
import ( "github.com/Shopify/sarama" "github.com/astaxie/beego/logs" )
var ( client sarama.SyncProducer )
func InitKafka(addr string) (err error) {
config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Partitioner = sarama.NewRandomPartitioner config.Producer.Return.Successes = true
client, err = sarama.NewSyncProducer([]string{addr}, config) if err != nil { logs.Error("初识化Kafka producer失败:", err) return } logs.Debug("初始化Kafka producer成功,地址为:", addr) return }
func SendToKafka(data, topic string) (err error) {
msg := &sarama.ProducerMessage{} msg.Topic = topic msg.Value = sarama.StringEncoder(data)
pid, offset, err := client.SendMessage(msg)
if err != nil { logs.Error("发送信息失败, err:%v, data:%v, topic:%v", err, data, topic) return }
logs.Debug("read success, pid:%v, offset:%v, topic:%v\n", pid, offset, topic) return }
|
开发环境:
我这里的环境是Go1.14, 使用了Go module模块, 所以想要快速运行该项目需要在项目文件夹下 go mod init, 运行时自动下载依赖
运行main函数:
E:\Go\logagent\main>go build
注: 如果想使用Goland直接运行,这里需要同时运行main包下的四个go文件

运行完如图:kafka消费成功, 写入my.log
成功

