Just Do IT !

Golang实战之海量日志收集系统(七)logTransfer之从kafka中获取日志信息

字数统计: 1.2k阅读时长: 6 min
2020/03/29 Share

在前六章中已经完成了日志收集的logAgent端, 接下来需要将日志数据写入到Kafka中, 然后将数据落地到Elastciseartch

项目架构图:

在这里插入图片描述

图源自:https://www.cnblogs.com/zhaof/p/8910761.html

项目逻辑图:

在这里插入图片描述

Elastciseartch与Kinaba下载与安装

Elastciseartch与Kinaba下载与安装, 参考我这篇博客

使用Golang控制Elasticsearch

环境版本:

1
2
3
go 1.14
elasticsearch-7.6.1
kibana-7.6.1-windows-x86_64

我们使用第三方库github.com/olivere/elastic

6.X包路径为github.com/olivere/elastic 7.X的包路径为github.com/olivere/elastic/v7,根据自己安装的elasticsearch版本进行区分

elastic库的官方文档https://olivere.github.io/elastic/上面有更加详细的实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package main

import (
"context"
"fmt"
"github.com/olivere/elastic/v7"
)

type Tweet struct {
User string
Message string
}

func main() {
client, err := elastic.NewClient(elastic.SetSniff(false), elastic.SetURL("http://localhost:9200/"))
if err != nil {
fmt.Println("connect es error", err)
return
}

fmt.Println("conn es succ")

tweet := Tweet{User: "haohan", Message: "This is a test"}
_, err = client.Index().
Index("twitter").
Id("1").
BodyJson(tweet).
Do(context.Background())
if err != nil {
// Handle error
panic(err)
return
}

fmt.Println("insert succ")
}

插入成功

1
2
conn es succ
insert succ

在这里插入图片描述

kafka消费示例

前面介绍了写数据到kafka中,数据需要从消息队列里面取出最终落地到es中。简单介绍下从kafka中获取数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package main

import (
"fmt"
"github.com/Shopify/sarama"
"strings"
"sync"
)

func main() {

consumer, err := sarama.NewConsumer(strings.Split("localhost:9092",","), nil)
if err != nil {
fmt.Println("Failed to start consumer: %s", err)
return
}
partitionList, err := consumer.Partitions("nginx_log")
if err != nil {
fmt.Println("Failed to get the list of partitions: ", err)
return
}
fmt.Println(partitionList)

// 按照分区来消费
for partition := range partitionList {
pc, err := consumer.ConsumePartition("nginx_log", int32(partition), sarama.OffsetNewest)
if err != nil {
fmt.Printf("Failed to start consumer for partition %d: %s\n", partition, err)
return
}
defer pc.AsyncClose()
go func(pc sarama.PartitionConsumer) {
for msg := range pc.Messages() {
fmt.Printf("Partition:%d, Offset:%d, Key:%s, Value:%s", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
fmt.Println()

}(pc)
}
time.Sleep(time.Hour)
_ = consumer.Close()
}

1
2
Partition:0, Offset:34208, Key:, Value:This is a test!
Partition:0, Offset:34209, Key:, Value:

已经能够从kafka中拿到日志信息了

下面开始logTransfer服务的开发,数据已经到kafka了,现在要从kafka中消费数据,然后写到es中,logTransfer做的就是这个工作

初识化logTransfer配置

项目目录:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
├─config
│ logTransfer.conf

├─es
│ elasticsearch.go

├─logs
│ my.log

└─main
kafka.go
config.go
log.go
main.go

main.go中填写初始化信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package main

import (
"github.com/astaxie/beego/logs"
"logCollect/logTransfer/kafka"
)

func main() {
// 初始化配置
err := InitConfig("ini", "E:\\Go\\logCollect\\logTransfer\\config\\logTransfer.conf")
if err != nil {
panic(err)
return
}
logs.Debug("初始化配置成功")

//初始化日志模块
err = initLogger(logConfig.LogPath, logConfig.LogLevel)
if err != nil {
panic(err)
return
}
logs.Debug("初始化日志模块成功")

// 初始化Kafka
err = kafka.InitKafka(logConfig.KafkaAddr, logConfig.KafkaTopic)
if err != nil {
logs.Error("初始化Kafka失败, err:", err)
return
}
logs.Debug("初始化Kafka成功")
}

logTransfer.conf

1
2
3
4
5
6
7
8
9
10
[logs]
log_level = debug
log_path = "E:\\Go\\logCollect\\logTransfer\\logs\\my.log"

[kafka]
server_addr = localhost:9092
topic = nginx_log

[elasticsearch]
addr = http://localhost:9200/

config.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package main

import (
"fmt"
"github.com/astaxie/beego/config"
)

type LogConfig struct {
KafkaAddr string
KafkaTopic string
EsAddr string
LogPath string
LogLevel string
}

var (
logConfig *LogConfig
)

func InitConfig(confType string, filename string) (err error) {
conf, err := config.NewConfig(confType, filename)
if err != nil {
fmt.Printf("初始化配置文件出错:%v\n", err)
return
}
// 导入配置信息
logConfig = &LogConfig{}
// 日志级别
logConfig.LogLevel = conf.String("logs::log_level")
if len(logConfig.LogLevel) == 0 {
logConfig.LogLevel = "debug"
}
// 日志输出路径
logConfig.LogPath = conf.String("logs::log_path")
if len(logConfig.LogPath) == 0 {
logConfig.LogPath = "E:\\Go\\logCollect\\logTransfer\\logs\\my.log"
}

// Kafka
logConfig.KafkaAddr = conf.String("kafka::server_addr")
if len(logConfig.KafkaAddr) == 0 {
err = fmt.Errorf("初识化Kafka addr失败")
return
}
logConfig.KafkaTopic = conf.String("kafka::topic")
if len(logConfig.KafkaAddr) == 0 {
err = fmt.Errorf("初识化Kafka topic失败")
return
}

// Es
logConfig.EsAddr = conf.String("elasticsearch::addr")
if len(logConfig.EsAddr) == 0 {
err = fmt.Errorf("初识化Es addr失败")
return
}
return
}

log,go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package main

import (
"encoding/json"
"fmt"
"github.com/astaxie/beego/logs"
)

func convertLogLevel(level string) int {

switch level {
case "debug":
return logs.LevelDebug
case "warn":
return logs.LevelWarn
case "info":
return logs.LevelInfo
case "trace":
return logs.LevelTrace
}
return logs.LevelDebug
}

func initLogger(logPath string, logLevel string) (err error) {

config := make(map[string]interface{})
config["filename"] = logPath
config["level"] = convertLogLevel(logLevel)
configStr, err := json.Marshal(config)
if err != nil {
fmt.Println("初始化日志, 序列化失败:", err)
return
}
_ = logs.SetLogger(logs.AdapterFile, string(configStr))

return
}

kafka.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package kafka

import (
"github.com/Shopify/sarama"
"github.com/astaxie/beego/logs"
"strings"
)

type KafkaClient struct {
client sarama.Consumer
addr string
topic string
}

var (
kafkaClient *KafkaClient
)

func InitKafka(addr string, topic string) (err error) {

kafkaClient = &KafkaClient{}
consumer, err := sarama.NewConsumer(strings.Split(addr, ","), nil)
if err != nil {
logs.Error("启动Kafka消费者错误: %s", err)
return nil
}
kafkaClient.client = consumer
kafkaClient.addr = addr
kafkaClient.topic = topic
return
}

运行main函数, 构建测试
输出日志在logs/my.log

1
2
2020/03/28 17:30:02.744 [D]  初始化日志模块成功
2020/03/28 17:30:02.778 [D] 初始化Kafka成功

初始化成功, 下面将使用Kafka消费数据, 并且将获取到的数据存入ES中

CATALOG
  1. 1. Elastciseartch与Kinaba下载与安装
  2. 2. 使用Golang控制Elasticsearch
  3. 3. kafka消费示例
  4. 4. 初识化logTransfer配置